1use std::{
2 collections::{HashMap, HashSet},
3 fmt::{Display, Formatter},
4 time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{
9 future::{join_all, try_join_all},
10 stream::FuturesUnordered,
11 StreamExt,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::{
16 sync::{
17 mpsc::{self, Receiver},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25 display::opt,
26 dto::{Block, ExtractorIdentity},
27 Bytes,
28};
29
30use crate::feed::{
31 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
32 synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
33};
34
35mod block_history;
36pub mod component_tracker;
37pub mod synchronizer;
38
39pub trait HeaderLike {
44 fn block(self) -> Option<BlockHeader>;
45 fn block_number_or_timestamp(self) -> u64;
46}
47
48#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
49pub struct BlockHeader {
50 pub hash: Bytes,
51 pub number: u64,
52 pub parent_hash: Bytes,
53 pub revert: bool,
54 pub timestamp: u64,
55}
56
57impl Display for BlockHeader {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 let short_hash = if self.hash.len() >= 4 {
61 hex::encode(&self.hash[..4]) } else {
63 hex::encode(&self.hash)
64 };
65
66 write!(f, "Block #{} [0x{}..]", self.number, short_hash)
67 }
68}
69
70impl BlockHeader {
71 fn from_block(block: &Block, revert: bool) -> Self {
72 Self {
73 hash: block.hash.clone(),
74 number: block.number,
75 parent_hash: block.parent_hash.clone(),
76 revert,
77 timestamp: block.ts.and_utc().timestamp() as u64,
78 }
79 }
80}
81
82impl HeaderLike for BlockHeader {
83 fn block(self) -> Option<BlockHeader> {
84 Some(self)
85 }
86
87 fn block_number_or_timestamp(self) -> u64 {
88 self.number
89 }
90}
91
92#[derive(Error, Debug)]
93pub enum BlockSynchronizerError {
94 #[error("Failed to initialize synchronizer: {0}")]
95 InitializationError(#[from] SynchronizerError),
96
97 #[error("Failed to process new block: {0}")]
98 BlockHistoryError(#[from] BlockHistoryError),
99
100 #[error("Not a single synchronizer was ready: {0}")]
101 NoReadySynchronizers(String),
102
103 #[error("No synchronizers were set")]
104 NoSynchronizers,
105
106 #[error("Failed to convert duration: {0}")]
107 DurationConversionError(String),
108}
109
110type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
111
112pub struct BlockSynchronizer<S> {
160 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
161 block_time: std::time::Duration,
163 latency_buffer: std::time::Duration,
165 startup_timeout: std::time::Duration,
167 max_messages: Option<usize>,
169 max_missed_blocks: u64,
171}
172
173#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
174#[serde(tag = "status", rename_all = "lowercase")]
175pub enum SynchronizerState {
176 Started,
178 Ready(BlockHeader),
180 Delayed(BlockHeader),
183 Stale(BlockHeader),
187 Advanced(BlockHeader),
192 Ended(String),
194}
195
196impl Display for SynchronizerState {
197 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
198 match self {
199 SynchronizerState::Started => write!(f, "Started"),
200 SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
201 SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
202 SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
203 SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
204 SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
205 }
206 }
207}
208
209pub struct SynchronizerStream {
210 extractor_id: ExtractorIdentity,
211 state: SynchronizerState,
212 error: Option<SynchronizerError>,
213 modify_ts: NaiveDateTime,
214 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
215}
216
217impl SynchronizerStream {
218 fn new(
219 extractor_id: &ExtractorIdentity,
220 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
221 ) -> Self {
222 Self {
223 extractor_id: extractor_id.clone(),
224 state: SynchronizerState::Started,
225 error: None,
226 modify_ts: Local::now().naive_utc(),
227 rx,
228 }
229 }
230 async fn try_advance(
231 &mut self,
232 block_history: &BlockHistory,
233 block_time: std::time::Duration,
234 latency_buffer: std::time::Duration,
235 stale_threshold: std::time::Duration,
236 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
237 let extractor_id = self.extractor_id.clone();
238 let latest_block = block_history.latest();
239
240 match &self.state {
241 SynchronizerState::Started | SynchronizerState::Ended(_) => {
242 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
243 Ok(None)
244 }
245 SynchronizerState::Advanced(b) => {
246 let future_block = b.clone();
247 self.transition(future_block, block_history, stale_threshold)?;
249 Ok(None)
250 }
251 SynchronizerState::Ready(previous_block) => {
252 self.try_recv_next_expected(
254 block_time + latency_buffer,
255 block_history,
256 previous_block.clone(),
257 stale_threshold,
258 )
259 .await
260 }
261 SynchronizerState::Delayed(old_block) => {
262 debug!(
264 %old_block,
265 latest_block=opt(&latest_block),
266 %extractor_id,
267 "Trying to catch up to latest block"
268 );
269 self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
270 .await
271 }
272 SynchronizerState::Stale(old_block) => {
273 debug!(
275 %old_block,
276 latest_block=opt(&latest_block),
277 %extractor_id,
278 "Trying to catch up stale synchronizer to latest block"
279 );
280 self.try_catch_up(block_history, block_time, stale_threshold)
281 .await
282 }
283 }
284 }
285
286 async fn try_recv_next_expected(
294 &mut self,
295 max_wait: std::time::Duration,
296 block_history: &BlockHistory,
297 previous_block: BlockHeader,
298 stale_threshold: std::time::Duration,
299 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
300 let extractor_id = self.extractor_id.clone();
301 match timeout(max_wait, self.rx.recv()).await {
302 Ok(Some(Ok(msg))) => {
303 self.transition(msg.header.clone(), block_history, stale_threshold)?;
304 Ok(Some(msg))
305 }
306 Ok(Some(Err(e))) => {
307 self.mark_errored(e);
309 Ok(None)
310 }
311 Ok(None) => {
312 warn!(
315 %extractor_id,
316 "Tried to poll from closed synchronizer.",
317 );
318 self.mark_closed();
319 Ok(None)
320 }
321 Err(_) => {
322 debug!(%extractor_id, %previous_block, "No block received within time limit.");
324 self.state = SynchronizerState::Delayed(previous_block.clone());
327 self.modify_ts = Local::now().naive_utc();
328 Ok(None)
329 }
330 }
331 }
332
333 async fn try_catch_up(
339 &mut self,
340 block_history: &BlockHistory,
341 max_wait: std::time::Duration,
342 stale_threshold: std::time::Duration,
343 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
344 let mut results = Vec::new();
345 let extractor_id = self.extractor_id.clone();
346
347 let deadline = std::time::Instant::now() + max_wait;
349
350 while std::time::Instant::now() < deadline {
351 match timeout(
352 deadline.saturating_duration_since(std::time::Instant::now()),
353 self.rx.recv(),
354 )
355 .await
356 {
357 Ok(Some(Ok(msg))) => {
358 debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
359 let block_pos = block_history.determine_block_position(&msg.header)?;
360 results.push(msg);
361 if matches!(block_pos, BlockPosition::NextExpected) {
362 break;
363 }
364 }
365 Ok(Some(Err(e))) => {
366 self.mark_errored(e);
368 return Ok(None);
369 }
370 Ok(None) => {
371 warn!(
374 %extractor_id,
375 "Tried to poll from closed synchronizer during catch up.",
376 );
377 self.mark_closed();
378 return Ok(None)
379 }
380 Err(_) => {
381 debug!(%extractor_id, "Timed out waiting for catch-up");
382 break;
383 }
384 }
385 }
386
387 let merged = results
388 .into_iter()
389 .reduce(|l, r| l.merge(r));
390
391 if let Some(msg) = merged {
392 debug!(%extractor_id, "Delayed extractor made progress!");
394 self.transition(msg.header.clone(), block_history, stale_threshold)?;
395 Ok(Some(msg))
396 } else {
397 self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
399 Ok(None)
400 }
401 }
402
403 fn check_and_transition_to_stale_if_needed(
405 &mut self,
406 stale_threshold: std::time::Duration,
407 fallback_header: Option<BlockHeader>,
408 ) -> Result<bool, BlockSynchronizerError> {
409 let now = Local::now().naive_utc();
410 let wait_duration = now.signed_duration_since(self.modify_ts);
411 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
412 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
413
414 if wait_duration > stale_threshold_chrono {
415 let header_to_use = match (&self.state, fallback_header) {
416 (SynchronizerState::Ready(h), _) |
417 (SynchronizerState::Delayed(h), _) |
418 (SynchronizerState::Stale(h), _) => h.clone(),
419 (_, Some(h)) => h,
420 _ => BlockHeader::default(),
421 };
422
423 warn!(
424 extractor_id=%self.extractor_id,
425 last_message_at=?self.modify_ts,
426 "SynchronizerStream transition to stale due to timeout."
427 );
428 self.state = SynchronizerState::Stale(header_to_use);
429 self.modify_ts = now;
430 Ok(true)
431 } else {
432 Ok(false)
433 }
434 }
435
436 fn transition(
443 &mut self,
444 latest_retrieved: BlockHeader,
445 block_history: &BlockHistory,
446 stale_threshold: std::time::Duration,
447 ) -> Result<(), BlockSynchronizerError> {
448 let extractor_id = self.extractor_id.clone();
449 let last_message_at = self.modify_ts;
450 let block = &latest_retrieved;
451
452 match block_history.determine_block_position(&latest_retrieved)? {
453 BlockPosition::NextExpected => {
454 self.state = SynchronizerState::Ready(latest_retrieved.clone());
455 trace!(
456 next = %latest_retrieved,
457 extractor = %extractor_id,
458 "SynchronizerStream transition to next expected"
459 )
460 }
461 BlockPosition::Latest | BlockPosition::Delayed => {
462 if !self.check_and_transition_to_stale_if_needed(
463 stale_threshold,
464 Some(latest_retrieved.clone()),
465 )? {
466 warn!(
467 %extractor_id,
468 ?last_message_at,
469 %block,
470 "SynchronizerStream transition transition to delayed."
471 );
472 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
473 }
474 }
475 BlockPosition::Advanced => {
476 info!(
477 %extractor_id,
478 ?last_message_at,
479 latest = opt(&block_history.latest()),
480 %block,
481 "SynchronizerStream transition to advanced."
482 );
483 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
484 }
485 }
486 self.modify_ts = Local::now().naive_utc();
487 Ok(())
488 }
489
490 fn mark_errored(&mut self, error: SynchronizerError) {
495 self.state = SynchronizerState::Ended(error.to_string());
496 self.modify_ts = Local::now().naive_utc();
497 self.error = Some(error);
498 }
499
500 fn mark_closed(&mut self) {
506 if !matches!(self.state, SynchronizerState::Ended(_)) {
507 self.state = SynchronizerState::Ended("Closed".to_string());
508 self.modify_ts = Local::now().naive_utc();
509 }
510 }
511
512 fn mark_stale(&mut self, header: &BlockHeader) {
514 self.state = SynchronizerState::Stale(header.clone());
515 self.modify_ts = Local::now().naive_utc();
516 }
517
518 fn mark_ready(&mut self, header: &BlockHeader) {
520 self.state = SynchronizerState::Ready(header.clone());
521 self.modify_ts = Local::now().naive_utc();
522 }
523
524 fn has_ended(&self) -> bool {
525 matches!(self.state, SynchronizerState::Ended(_))
526 }
527
528 fn is_stale(&self) -> bool {
529 matches!(self.state, SynchronizerState::Stale(_))
530 }
531
532 fn is_advanced(&self) -> bool {
533 matches!(self.state, SynchronizerState::Advanced(_))
534 }
535
536 fn get_current_header(&self) -> Option<&BlockHeader> {
540 match &self.state {
541 SynchronizerState::Ready(b) |
542 SynchronizerState::Delayed(b) |
543 SynchronizerState::Advanced(b) => Some(b),
544 _ => None,
545 }
546 }
547}
548
549#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
550pub struct FeedMessage<H = BlockHeader>
551where
552 H: HeaderLike,
553{
554 pub state_msgs: HashMap<String, StateSyncMessage<H>>,
555 pub sync_states: HashMap<String, SynchronizerState>,
556}
557
558impl<H> FeedMessage<H>
559where
560 H: HeaderLike,
561{
562 fn new(
563 state_msgs: HashMap<String, StateSyncMessage<H>>,
564 sync_states: HashMap<String, SynchronizerState>,
565 ) -> Self {
566 Self { state_msgs, sync_states }
567 }
568}
569
570impl<S> BlockSynchronizer<S>
571where
572 S: StateSynchronizer,
573{
574 pub fn new(
575 block_time: std::time::Duration,
576 latency_buffer: std::time::Duration,
577 max_missed_blocks: u64,
578 ) -> Self {
579 Self {
580 synchronizers: None,
581 max_messages: None,
582 block_time,
583 latency_buffer,
584 startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
585 max_missed_blocks,
586 }
587 }
588
589 pub fn max_messages(&mut self, val: usize) {
595 self.max_messages = Some(val);
596 }
597
598 pub fn startup_timeout(mut self, val: Duration) {
602 self.startup_timeout = val;
603 }
604
605 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
606 let mut registered = self.synchronizers.unwrap_or_default();
607 registered.insert(id, synchronizer);
608 self.synchronizers = Some(registered);
609 self
610 }
611
612 #[cfg(test)]
613 pub fn with_short_timeouts() -> Self {
614 Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
615 }
616
617 async fn cleanup_synchronizers(
620 mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
621 sync_close_senders: Vec<oneshot::Sender<()>>,
622 ) {
623 for close_sender in sync_close_senders {
625 let _ = close_sender.send(());
626 }
627
628 let mut completed_tasks = 0;
630 while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
631 completed_tasks += 1;
632 }
633
634 let remaining_tasks = state_sync_tasks.len();
636 if remaining_tasks > 0 {
637 warn!(
638 completed = completed_tasks,
639 timed_out = remaining_tasks,
640 "Some synchronizers timed out during cleanup and may not have shut down cleanly"
641 );
642 }
643 }
644
645 pub async fn run(
650 mut self,
651 ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
652 {
653 trace!("Starting BlockSynchronizer...");
654 let state_sync_tasks = FuturesUnordered::new();
655 let mut synchronizers = self
656 .synchronizers
657 .take()
658 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
659 let init_tasks = synchronizers
661 .values_mut()
662 .map(|s| s.initialize())
663 .collect::<Vec<_>>();
664 try_join_all(init_tasks).await?;
665
666 let mut sync_streams = HashMap::with_capacity(synchronizers.len());
667 let mut sync_close_senders = Vec::new();
668 for (extractor_id, synchronizer) in synchronizers.drain() {
669 let (handle, rx) = synchronizer.start().await;
670 let (join_handle, close_sender) = handle.split();
671 state_sync_tasks.push(join_handle);
672 sync_close_senders.push(close_sender);
673
674 sync_streams.insert(extractor_id.clone(), SynchronizerStream::new(&extractor_id, rx));
675 }
676
677 debug!("Waiting for initial synchronizer messages...");
680 let mut startup_futures = Vec::new();
681 for (id, sh) in sync_streams.iter_mut() {
682 let fut = async {
683 let res = timeout(self.startup_timeout, sh.rx.recv()).await;
684 (id.clone(), res)
685 };
686 startup_futures.push(fut);
687 }
688 let mut ready_sync_msgs = HashMap::new();
689 let initial_headers = join_all(startup_futures)
690 .await
691 .into_iter()
692 .filter_map(|(extractor_id, res)| {
693 let synchronizer = sync_streams
694 .get_mut(&extractor_id)
695 .unwrap();
696 match res {
697 Ok(Some(Ok(msg))) => {
698 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
699 synchronizer.mark_ready(&msg.header);
701 ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
702 Some(msg.header)
703 }
704 Ok(Some(Err(e))) => {
705 synchronizer.mark_errored(e);
706 None
707 }
708 Ok(None) => {
709 warn!(%extractor_id, "Synchronizer closed during startup");
714 synchronizer.mark_closed();
715 None
716 }
717 Err(_) => {
718 warn!(%extractor_id, "Timed out waiting for first message");
720 synchronizer.mark_stale(&BlockHeader::default());
721 None
722 }
723 }
724 })
725 .collect::<HashSet<_>>() .into_iter()
727 .collect::<Vec<_>>();
728
729 Self::check_streams(&sync_streams)?;
731 let mut block_history = BlockHistory::new(initial_headers, 15)?;
732 let start_header = block_history
734 .latest()
735 .expect("Safe since we checked streams before");
736 info!(
737 start_block=%start_header,
738 n_healthy=ready_sync_msgs.len(),
739 n_total=sync_streams.len(),
740 "Block synchronization started successfully!"
741 );
742
743 for (_, stream) in sync_streams.iter_mut() {
746 if let SynchronizerState::Ready(header) = stream.state.clone() {
747 if header.number < start_header.number {
748 debug!(
749 extractor_id=%stream.extractor_id,
750 synchronizer_block=header.number,
751 current_block=start_header.number,
752 "Marking synchronizer as delayed during initialization"
753 );
754 stream.state = SynchronizerState::Delayed(header);
755 }
756 }
757 }
758
759 let (sync_tx, sync_rx) = mpsc::channel(30);
760 let main_loop_jh = tokio::spawn(async move {
761 let mut n_iter = 1;
762 loop {
763 let msg = FeedMessage::new(
765 std::mem::take(&mut ready_sync_msgs),
766 sync_streams
767 .iter()
768 .map(|(a, b)| (a.name.to_string(), b.state.clone()))
769 .collect(),
770 );
771 if sync_tx.send(Ok(msg)).await.is_err() {
772 info!("Receiver closed, block synchronizer terminating..");
773 return;
774 };
775
776 if let Some(max_messages) = self.max_messages {
778 if n_iter >= max_messages {
779 info!(max_messages, "StreamEnd");
780 return;
781 }
782 }
783 n_iter += 1;
784
785 let res = self
786 .handle_next_message(
787 &mut sync_streams,
788 &mut ready_sync_msgs,
789 &mut block_history,
790 )
791 .await;
792
793 if let Err(e) = res {
794 let _ = sync_tx.send(Err(e)).await;
796 return;
797 }
798 }
799 });
800
801 let nanny_jh = tokio::spawn(async move {
806 let _ = main_loop_jh.await.map_err(|e| {
808 if e.is_panic() {
809 error!("BlockSynchornizer main loop panicked: {e}")
810 }
811 });
812 debug!("Main loop exited. Closing synchronizers");
813 Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
814 debug!("Shutdown complete");
815 });
816 Ok((nanny_jh, sync_rx))
817 }
818
819 async fn handle_next_message(
824 &self,
825 sync_streams: &mut HashMap<ExtractorIdentity, SynchronizerStream>,
826 ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
827 block_history: &mut BlockHistory,
828 ) -> BlockSyncResult<()> {
829 let mut recv_futures = Vec::new();
830 for (extractor_id, stream) in sync_streams.iter_mut() {
831 if stream.has_ended() {
834 continue
835 }
836 recv_futures.push(async {
846 let res = stream
847 .try_advance(
848 block_history,
849 self.block_time,
850 self.latency_buffer,
851 self.block_time
852 .mul_f64(self.max_missed_blocks as f64),
853 )
854 .await?;
855 Ok::<_, BlockSynchronizerError>(res.map(|msg| (extractor_id.name.clone(), msg)))
856 });
857 }
858 ready_sync_msgs.extend(
859 join_all(recv_futures)
860 .await
861 .into_iter()
862 .collect::<Result<Vec<_>, _>>()?
863 .into_iter()
864 .flatten(),
865 );
866
867 Self::check_streams(sync_streams)?;
870
871 if sync_streams
874 .values()
875 .any(SynchronizerStream::is_advanced)
876 {
877 *block_history = Self::reinit_block_history(sync_streams, block_history)?;
878 } else {
879 let header = sync_streams
880 .values()
881 .filter_map(SynchronizerStream::get_current_header)
882 .max_by_key(|b| b.number)
883 .expect("Active streams are present, since we checked above");
884 block_history.push(header.clone())?;
885 }
886 Ok(())
887 }
888
889 fn reinit_block_history(
898 sync_streams: &mut HashMap<ExtractorIdentity, SynchronizerStream>,
899 block_history: &mut BlockHistory,
900 ) -> Result<BlockHistory, BlockSynchronizerError> {
901 let previous = block_history.latest().expect(
902 "Old block history should not be empty, startup should have populated it at this point",
903 );
904 let blocks = sync_streams
905 .values()
906 .filter_map(SynchronizerStream::get_current_header)
907 .cloned()
908 .collect();
909 let new_block_history = BlockHistory::new(blocks, 10)?;
910 let latest = block_history
911 .latest()
912 .expect("Block history should not be empty, we just populated it.");
913 info!(
914 %previous,
915 %latest,
916 "Advanced synchronizer detected. Reinitialized block history."
917 );
918 sync_streams
919 .values_mut()
920 .for_each(|stream| {
921 if let Some(header) = stream.get_current_header() {
924 if header.number < latest.number {
925 stream.state = SynchronizerState::Delayed(header.clone());
926 } else if header.number == latest.number {
927 stream.state = SynchronizerState::Ready(header.clone());
928 }
929 }
930 });
931 Ok(new_block_history)
932 }
933
934 fn check_streams(
939 sync_streams: &HashMap<ExtractorIdentity, SynchronizerStream>,
940 ) -> BlockSyncResult<()> {
941 if sync_streams
942 .values()
943 .all(|stream| stream.has_ended() | stream.is_stale())
944 {
945 let mut reason = Vec::new();
946 if let Some((last_errored_id, last_errored_stream)) = sync_streams
947 .iter()
948 .filter(|(_, stream)| stream.has_ended() | stream.is_stale())
949 .max_by_key(|(_, stream)| stream.modify_ts)
950 {
951 if let Some(err) = &last_errored_stream.error {
952 reason.push(format!("Synchronizer for {last_errored_id} errored with: {err}"))
954 } else {
955 reason.push(format!(
957 "Synchronizer for {last_errored_id} became: {}",
958 last_errored_stream.state
959 ))
960 }
961 } else {
962 reason.push(
963 "Can't identify protocol that caused the stream to end! \
964 This condition should be unreachable!"
965 .to_string(),
966 )
967 }
968
969 sync_streams
970 .iter()
971 .for_each(|(id, stream)| {
972 reason
973 .push(format!("{id} reported as {} at {}", stream.state, stream.modify_ts))
974 });
975
976 return Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")));
977 }
978 Ok(())
979 }
980}
981
982#[cfg(test)]
983mod tests {
984 use std::sync::Arc;
985
986 use async_trait::async_trait;
987 use test_log::test;
988 use tokio::sync::{oneshot, Mutex};
989 use tycho_common::dto::Chain;
990
991 use super::*;
992 use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
993
994 #[derive(Clone, Debug)]
995 enum MockBehavior {
996 Normal, IgnoreClose, ExitImmediately, }
1000
1001 type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1002
1003 #[derive(Clone)]
1004 struct MockStateSync {
1005 header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1006 header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1007 close_received: Arc<Mutex<bool>>,
1008 behavior: MockBehavior,
1009 close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1011 }
1012
1013 impl MockStateSync {
1014 fn new() -> Self {
1015 Self::with_behavior(MockBehavior::Normal)
1016 }
1017
1018 fn with_behavior(behavior: MockBehavior) -> Self {
1019 let (tx, rx) = mpsc::channel(1);
1020 Self {
1021 header_tx: tx,
1022 header_rx: Arc::new(Mutex::new(Some(rx))),
1023 close_received: Arc::new(Mutex::new(false)),
1024 behavior,
1025 close_tx: Arc::new(Mutex::new(None)),
1026 }
1027 }
1028
1029 async fn was_close_received(&self) -> bool {
1030 *self.close_received.lock().await
1031 }
1032
1033 async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1034 self.header_tx
1035 .send(Ok(header))
1036 .await
1037 .map_err(|e| format!("sending header failed: {e}"))
1038 }
1039
1040 async fn trigger_close(&self) {
1042 if let Some(close_tx) = self.close_tx.lock().await.take() {
1043 let _ = close_tx.send(());
1044 }
1045 }
1046 }
1047
1048 #[async_trait]
1049 impl StateSynchronizer for MockStateSync {
1050 async fn initialize(&mut self) -> SyncResult<()> {
1051 Ok(())
1052 }
1053
1054 async fn start(
1055 mut self,
1056 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1057 let block_rx = {
1058 let mut guard = self.header_rx.lock().await;
1059 guard
1060 .take()
1061 .expect("Block receiver was not set!")
1062 };
1063
1064 let (close_tx_for_handle, close_rx) = oneshot::channel();
1067 let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1068
1069 {
1071 let mut guard = self.close_tx.lock().await;
1072 *guard = Some(close_tx_for_test);
1073 }
1074
1075 let behavior = self.behavior.clone();
1076 let close_received_clone = self.close_received.clone();
1077 let tx = self.header_tx.clone();
1078
1079 let jh = tokio::spawn(async move {
1080 match behavior {
1081 MockBehavior::IgnoreClose => {
1082 loop {
1085 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1086 }
1087 }
1088 MockBehavior::ExitImmediately => {
1089 tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1091 "Simulated immediate task failure".to_string(),
1092 )))
1093 .await
1094 .unwrap();
1095 }
1096 MockBehavior::Normal => {
1097 let _ = tokio::select! {
1100 result = close_rx => result,
1101 result = close_rx_for_test => result,
1102 };
1103 let mut guard = close_received_clone.lock().await;
1104 *guard = true;
1105 }
1106 }
1107 });
1108
1109 let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1110 (handle, block_rx)
1111 }
1112 }
1113
1114 fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1115 StateSyncMessage {
1116 header: BlockHeader {
1117 number: block as u64,
1118 hash: Bytes::from(vec![block]),
1119 parent_hash: Bytes::from(vec![block - 1]),
1120 revert: false,
1121 timestamp: 1000,
1122 },
1123 ..Default::default()
1124 }
1125 }
1126
1127 async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1128 timeout(Duration::from_millis(100), rx.recv())
1129 .await
1130 .expect("Responds in time")
1131 .expect("Should receive first message")
1132 .expect("No error")
1133 }
1134
1135 async fn setup_block_sync(
1136 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1137 {
1138 setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1139 }
1140
1141 async fn setup_block_sync_with_behaviour(
1143 v2_behavior: MockBehavior,
1144 v3_behavior: MockBehavior,
1145 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1146 {
1147 let v2_sync = MockStateSync::with_behavior(v2_behavior);
1148 let v3_sync = MockStateSync::with_behavior(v3_behavior);
1149
1150 let mut block_sync = BlockSynchronizer::new(
1152 Duration::from_millis(20), Duration::from_millis(10), 3, );
1156 block_sync.max_messages(10); let block_sync = block_sync
1159 .register_synchronizer(
1160 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1161 v2_sync.clone(),
1162 )
1163 .register_synchronizer(
1164 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1165 v3_sync.clone(),
1166 );
1167
1168 let block1_msg = header_message(1);
1170 let _ = v2_sync
1171 .send_header(block1_msg.clone())
1172 .await;
1173 let _ = v3_sync
1174 .send_header(block1_msg.clone())
1175 .await;
1176
1177 let (nanny_handle, mut rx) = block_sync
1179 .run()
1180 .await
1181 .expect("BlockSynchronizer failed to start");
1182
1183 let first_feed_msg = receive_message(&mut rx).await;
1184 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1185 assert!(matches!(
1186 first_feed_msg
1187 .sync_states
1188 .get("uniswap-v2")
1189 .unwrap(),
1190 SynchronizerState::Ready(_)
1191 ));
1192 assert!(matches!(
1193 first_feed_msg
1194 .sync_states
1195 .get("uniswap-v3")
1196 .unwrap(),
1197 SynchronizerState::Ready(_)
1198 ));
1199
1200 (v2_sync, v3_sync, nanny_handle, rx)
1201 }
1202
1203 async fn shutdown_block_synchronizer(
1204 v2_sync: &MockStateSync,
1205 v3_sync: &MockStateSync,
1206 nanny_handle: JoinHandle<()>,
1207 ) {
1208 v3_sync.trigger_close().await;
1209 v2_sync.trigger_close().await;
1210
1211 timeout(Duration::from_millis(100), nanny_handle)
1212 .await
1213 .expect("Nanny failed to exit within time")
1214 .expect("Nanny panicked");
1215 }
1216
1217 #[test(tokio::test)]
1218 async fn test_two_ready_synchronizers() {
1219 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1220
1221 let second_msg = header_message(2);
1222 v2_sync
1223 .send_header(second_msg.clone())
1224 .await
1225 .expect("send_header failed");
1226 v3_sync
1227 .send_header(second_msg.clone())
1228 .await
1229 .expect("send_header failed");
1230 let second_feed_msg = receive_message(&mut rx).await;
1231
1232 let exp2 = FeedMessage {
1233 state_msgs: [
1234 ("uniswap-v2".to_string(), second_msg.clone()),
1235 ("uniswap-v3".to_string(), second_msg.clone()),
1236 ]
1237 .into_iter()
1238 .collect(),
1239 sync_states: [
1240 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1241 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1242 ]
1243 .into_iter()
1244 .collect(),
1245 };
1246 assert_eq!(second_feed_msg, exp2);
1247
1248 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1249 }
1250
1251 #[test(tokio::test)]
1252 async fn test_delayed_synchronizer_catches_up() {
1253 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1254
1255 let block2_msg = header_message(2);
1257 v2_sync
1258 .send_header(block2_msg.clone())
1259 .await
1260 .expect("send_header failed");
1261
1262 let second_feed_msg = receive_message(&mut rx).await;
1264 debug!("Consumed second message for v2");
1265
1266 assert!(second_feed_msg
1267 .state_msgs
1268 .contains_key("uniswap-v2"));
1269 assert!(matches!(
1270 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1271 SynchronizerState::Ready(header) if header.number == 2
1272 ));
1273 assert!(!second_feed_msg
1274 .state_msgs
1275 .contains_key("uniswap-v3"));
1276 assert!(matches!(
1277 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1278 SynchronizerState::Delayed(header) if header.number == 1
1279 ));
1280
1281 v3_sync
1283 .send_header(block2_msg.clone())
1284 .await
1285 .expect("send_header failed");
1286
1287 let block3_msg = header_message(3);
1289 v2_sync
1290 .send_header(block3_msg.clone())
1291 .await
1292 .expect("send_header failed");
1293 v3_sync
1294 .send_header(block3_msg)
1295 .await
1296 .expect("send_header failed");
1297
1298 let mut third_feed_msg = receive_message(&mut rx).await;
1301
1302 if !third_feed_msg
1305 .state_msgs
1306 .contains_key("uniswap-v2")
1307 {
1308 third_feed_msg = rx
1309 .recv()
1310 .await
1311 .expect("header channel was closed")
1312 .expect("no error");
1313 }
1314 assert!(third_feed_msg
1315 .state_msgs
1316 .contains_key("uniswap-v2"));
1317 assert!(third_feed_msg
1318 .state_msgs
1319 .contains_key("uniswap-v3"));
1320 assert!(matches!(
1321 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1322 SynchronizerState::Ready(header) if header.number == 3
1323 ));
1324 assert!(matches!(
1325 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1326 SynchronizerState::Ready(header) if header.number == 3
1327 ));
1328
1329 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1330 }
1331
1332 #[test(tokio::test)]
1333 async fn test_different_start_blocks() {
1334 let v2_sync = MockStateSync::new();
1335 let v3_sync = MockStateSync::new();
1336 let block_sync = BlockSynchronizer::with_short_timeouts()
1337 .register_synchronizer(
1338 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1339 v2_sync.clone(),
1340 )
1341 .register_synchronizer(
1342 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1343 v3_sync.clone(),
1344 );
1345
1346 let block1_msg = header_message(1);
1348 let block2_msg = header_message(2);
1349
1350 let _ = v2_sync
1351 .send_header(block1_msg.clone())
1352 .await;
1353 v3_sync
1354 .send_header(block2_msg.clone())
1355 .await
1356 .expect("send_header failed");
1357
1358 let (jh, mut rx) = block_sync
1360 .run()
1361 .await
1362 .expect("BlockSynchronizer failed to start.");
1363
1364 let first_feed_msg = receive_message(&mut rx).await;
1366 assert!(matches!(
1367 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1368 SynchronizerState::Delayed(header) if header.number == 1
1369 ));
1370 assert!(matches!(
1371 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1372 SynchronizerState::Ready(header) if header.number == 2
1373 ));
1374
1375 v2_sync
1377 .send_header(block2_msg.clone())
1378 .await
1379 .expect("send_header failed");
1380
1381 let block3_msg = header_message(3);
1383 let _ = v2_sync
1384 .send_header(block3_msg.clone())
1385 .await;
1386 v3_sync
1387 .send_header(block3_msg.clone())
1388 .await
1389 .expect("send_header failed");
1390
1391 let second_feed_msg = receive_message(&mut rx).await;
1393 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1394 assert!(matches!(
1395 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1396 SynchronizerState::Ready(header) if header.number == 3
1397 ));
1398 assert!(matches!(
1399 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1400 SynchronizerState::Ready(header) if header.number == 3
1401 ));
1402
1403 shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
1404 }
1405
1406 #[test(tokio::test)]
1407 async fn test_synchronizer_fails_other_goes_stale() {
1408 let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1409 setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1410 .await;
1411
1412 let mut error_reported = false;
1413 for _ in 0..3 {
1414 if let Some(msg) = sync_rx.recv().await {
1415 match msg {
1416 Err(_) => error_reported = true,
1417 Ok(msg) => {
1418 assert!(matches!(
1419 msg.sync_states
1420 .get("uniswap-v3")
1421 .unwrap(),
1422 SynchronizerState::Delayed(_)
1423 ));
1424 assert!(matches!(
1425 msg.sync_states
1426 .get("uniswap-v2")
1427 .unwrap(),
1428 SynchronizerState::Ended(_)
1429 ));
1430 }
1431 }
1432 }
1433 }
1434 assert!(error_reported, "BlockSynchronizer did not report final error");
1435
1436 let result = timeout(Duration::from_secs(2), nanny_handle).await;
1438 assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1439
1440 assert!(
1442 v3_sync.was_close_received().await,
1443 "v3_sync should have received close signal during cleanup"
1444 );
1445 }
1446
1447 #[test(tokio::test)]
1448 async fn test_cleanup_timeout_warning() {
1449 let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1452 MockBehavior::ExitImmediately,
1453 MockBehavior::IgnoreClose,
1454 )
1455 .await;
1456
1457 let result = timeout(Duration::from_secs(10), nanny_handle).await;
1459 assert!(
1460 result.is_ok(),
1461 "Nanny should complete even when some synchronizers timeout during cleanup"
1462 );
1463
1464 }
1468
1469 #[test(tokio::test)]
1470 async fn test_one_synchronizer_goes_stale_while_other_works() {
1471 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1473
1474 let block2_msg = header_message(2);
1476 let _ = v3_sync
1477 .send_header(block2_msg.clone())
1478 .await;
1479 let second_feed_msg = receive_message(&mut rx).await;
1483 assert!(second_feed_msg
1484 .state_msgs
1485 .contains_key("uniswap-v3"));
1486 assert!(!second_feed_msg
1487 .state_msgs
1488 .contains_key("uniswap-v2"));
1489 assert!(matches!(
1490 second_feed_msg
1491 .sync_states
1492 .get("uniswap-v3")
1493 .unwrap(),
1494 SynchronizerState::Ready(_)
1495 ));
1496 if let Some(v2_state) = second_feed_msg
1498 .sync_states
1499 .get("uniswap-v2")
1500 {
1501 if matches!(v2_state, SynchronizerState::Delayed(_)) {
1502 assert!(
1504 !nanny_handle.is_finished(),
1505 "Nanny should still be running when synchronizer is delayed (not stale yet)"
1506 );
1507 }
1508 }
1509
1510 tokio::time::sleep(Duration::from_millis(15)).await;
1512
1513 let block3_msg = header_message(3);
1515 let _ = v3_sync
1516 .send_header(block3_msg.clone())
1517 .await;
1518
1519 tokio::time::sleep(Duration::from_millis(40)).await;
1520
1521 let mut stale_found = false;
1522 for _ in 0..2 {
1523 if let Some(Ok(msg)) = rx.recv().await {
1524 if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1525 stale_found = true;
1526 }
1527 }
1528 }
1529 assert!(stale_found, "v2 synchronizer should be stale");
1530
1531 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1532 }
1533
1534 #[test(tokio::test)]
1535 async fn test_all_synchronizers_go_stale_main_loop_exits() {
1536 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1538
1539 let mut seen_delayed = false;
1544
1545 let timeout_duration = Duration::from_millis(500); let start_time = tokio::time::Instant::now();
1549
1550 while let Ok(Some(Ok(msg))) =
1551 tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1552 {
1553 if !seen_delayed {
1555 let v2_state = msg.sync_states.get("uniswap-v2");
1556 let v3_state = msg.sync_states.get("uniswap-v3");
1557
1558 if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1559 matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1560 {
1561 seen_delayed = true;
1562 assert!(!nanny_handle.is_finished(),
1564 "Nanny should still be running when synchronizers are delayed (not stale yet)");
1565 break;
1567 }
1568 }
1569
1570 if start_time.elapsed() > timeout_duration {
1572 break;
1573 }
1574 }
1575 assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1577
1578 let mut error_reported = false;
1579 while let Some(msg) = rx.recv().await {
1581 if let Err(e) = msg {
1582 assert!(e
1583 .to_string()
1584 .contains("became: Stale(1)"));
1585 assert!(e
1586 .to_string()
1587 .contains("reported as Stale(1)"));
1588 error_reported = true;
1589 }
1590 }
1591 assert!(error_reported, "Expected the channel to report an error before closing");
1592
1593 let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1595 assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1596
1597 assert!(
1599 v2_sync.was_close_received().await,
1600 "v2_sync should have received close signal during cleanup"
1601 );
1602 assert!(
1603 v3_sync.was_close_received().await,
1604 "v3_sync should have received close signal during cleanup"
1605 );
1606 }
1607
1608 #[test(tokio::test)]
1609 async fn test_stale_synchronizer_recovers() {
1610 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1612
1613 tokio::time::sleep(Duration::from_millis(50)).await;
1615 let block2_msg = header_message(2);
1616 let _ = v2_sync
1617 .send_header(block2_msg.clone())
1618 .await;
1619
1620 for _ in 0..2 {
1622 if let Some(msg) = rx.recv().await {
1623 if let Ok(msg) = msg {
1624 if matches!(
1625 msg.sync_states
1626 .get("uniswap-v2")
1627 .unwrap(),
1628 SynchronizerState::Ready(_)
1629 ) {
1630 assert!(matches!(
1631 msg.sync_states
1632 .get("uniswap-v3")
1633 .unwrap(),
1634 SynchronizerState::Delayed(_)
1635 ));
1636 break;
1637 };
1638 }
1639 } else {
1640 panic!("Channel closed unexpectedly")
1641 }
1642 }
1643
1644 tokio::time::sleep(Duration::from_millis(15)).await;
1646 let block3_msg = header_message(3);
1647 let _ = v2_sync
1648 .send_header(block3_msg.clone())
1649 .await;
1650 let third_msg = receive_message(&mut rx).await;
1651 dbg!(&third_msg);
1652 assert!(matches!(
1653 third_msg
1654 .sync_states
1655 .get("uniswap-v2")
1656 .unwrap(),
1657 SynchronizerState::Ready(_)
1658 ));
1659 assert!(matches!(
1660 third_msg
1661 .sync_states
1662 .get("uniswap-v3")
1663 .unwrap(),
1664 SynchronizerState::Stale(_)
1665 ));
1666
1667 let block4_msg = header_message(4);
1668 let _ = v3_sync
1669 .send_header(block2_msg.clone())
1670 .await;
1671 let _ = v3_sync
1672 .send_header(block3_msg.clone())
1673 .await;
1674 let _ = v3_sync
1675 .send_header(block4_msg.clone())
1676 .await;
1677 let _ = v2_sync
1678 .send_header(block4_msg.clone())
1679 .await;
1680 let fourth_msg = receive_message(&mut rx).await;
1681 assert!(matches!(
1682 fourth_msg
1683 .sync_states
1684 .get("uniswap-v2")
1685 .unwrap(),
1686 SynchronizerState::Ready(_)
1687 ));
1688 assert!(matches!(
1689 fourth_msg
1690 .sync_states
1691 .get("uniswap-v3")
1692 .unwrap(),
1693 SynchronizerState::Ready(_)
1694 ));
1695
1696 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1697
1698 assert!(
1700 v2_sync.was_close_received().await,
1701 "v2_sync should have received close signal during cleanup"
1702 );
1703 assert!(
1704 v3_sync.was_close_received().await,
1705 "v3_sync should have received close signal during cleanup"
1706 );
1707 }
1708
1709 #[test(tokio::test)]
1710 async fn test_all_synchronizer_advanced() {
1711 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1715
1716 let block3 = header_message(3);
1717 v2_sync
1718 .send_header(block3.clone())
1719 .await
1720 .unwrap();
1721 v3_sync
1722 .send_header(block3)
1723 .await
1724 .unwrap();
1725
1726 let msg = receive_message(&mut rx).await;
1727 matches!(
1728 msg.sync_states
1729 .get("uniswap-v2")
1730 .unwrap(),
1731 SynchronizerState::Ready(_)
1732 );
1733 matches!(
1734 msg.sync_states
1735 .get("uniswap-v3")
1736 .unwrap(),
1737 SynchronizerState::Ready(_)
1738 );
1739
1740 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1741 }
1742
1743 #[test(tokio::test)]
1744 async fn test_one_synchronizer_advanced() {
1745 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1746
1747 let block2 = header_message(2);
1748 let block4 = header_message(4);
1749 v2_sync
1750 .send_header(block4.clone())
1751 .await
1752 .unwrap();
1753 v3_sync
1754 .send_header(block2.clone())
1755 .await
1756 .unwrap();
1757
1758 let msg = receive_message(&mut rx).await;
1759 matches!(
1760 msg.sync_states
1761 .get("uniswap-v2")
1762 .unwrap(),
1763 SynchronizerState::Ready(_)
1764 );
1765 matches!(
1766 msg.sync_states
1767 .get("uniswap-v3")
1768 .unwrap(),
1769 SynchronizerState::Delayed(_)
1770 );
1771
1772 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1773 }
1774}