1use std::{
2 collections::{HashMap, HashSet},
3 fmt::{Display, Formatter},
4 time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{
9 future::{join_all, try_join_all},
10 stream::FuturesUnordered,
11 StreamExt,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::{
16 sync::{
17 mpsc::{self, Receiver},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25 display::opt,
26 dto::{Block, ExtractorIdentity},
27 Bytes,
28};
29
30use crate::feed::{
31 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
32 synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
33};
34
35mod block_history;
36pub mod component_tracker;
37pub mod synchronizer;
38
39pub trait HeaderLike {
44 fn block(self) -> Option<BlockHeader>;
45 fn block_number_or_timestamp(self) -> u64;
46}
47
48#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
49pub struct BlockHeader {
50 pub hash: Bytes,
51 pub number: u64,
52 pub parent_hash: Bytes,
53 pub revert: bool,
54 pub timestamp: u64,
55}
56
57impl Display for BlockHeader {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 let short_hash = if self.hash.len() >= 4 {
61 hex::encode(&self.hash[..4]) } else {
63 hex::encode(&self.hash)
64 };
65
66 write!(f, "Block #{} [0x{}..]", self.number, short_hash)
67 }
68}
69
70impl BlockHeader {
71 fn from_block(block: &Block, revert: bool) -> Self {
72 Self {
73 hash: block.hash.clone(),
74 number: block.number,
75 parent_hash: block.parent_hash.clone(),
76 revert,
77 timestamp: block.ts.and_utc().timestamp() as u64,
78 }
79 }
80}
81
82impl HeaderLike for BlockHeader {
83 fn block(self) -> Option<BlockHeader> {
84 Some(self)
85 }
86
87 fn block_number_or_timestamp(self) -> u64 {
88 self.number
89 }
90}
91
92#[derive(Error, Debug)]
93pub enum BlockSynchronizerError {
94 #[error("Failed to initialize synchronizer: {0}")]
95 InitializationError(#[from] SynchronizerError),
96
97 #[error("Failed to process new block: {0}")]
98 BlockHistoryError(#[from] BlockHistoryError),
99
100 #[error("Not a single synchronizer was ready: {0}")]
101 NoReadySynchronizers(String),
102
103 #[error("No synchronizers were set")]
104 NoSynchronizers,
105
106 #[error("Failed to convert duration: {0}")]
107 DurationConversionError(String),
108}
109
110type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
111
112pub struct BlockSynchronizer<S> {
160 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
161 block_time: std::time::Duration,
163 latency_buffer: std::time::Duration,
165 startup_timeout: std::time::Duration,
167 max_messages: Option<usize>,
169 max_missed_blocks: u64,
171}
172
173#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
174#[serde(tag = "status", rename_all = "lowercase")]
175pub enum SynchronizerState {
176 Started,
178 Ready(BlockHeader),
180 Delayed(BlockHeader),
183 Stale(BlockHeader),
187 Advanced(BlockHeader),
192 Ended(String),
194}
195
196impl Display for SynchronizerState {
197 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
198 match self {
199 SynchronizerState::Started => write!(f, "Started"),
200 SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
201 SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
202 SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
203 SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
204 SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
205 }
206 }
207}
208
209pub struct SynchronizerStream {
210 extractor_id: ExtractorIdentity,
211 state: SynchronizerState,
212 error: Option<SynchronizerError>,
213 modify_ts: NaiveDateTime,
214 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
215}
216
217impl SynchronizerStream {
218 fn new(
219 extractor_id: &ExtractorIdentity,
220 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
221 ) -> Self {
222 Self {
223 extractor_id: extractor_id.clone(),
224 state: SynchronizerState::Started,
225 error: None,
226 modify_ts: Local::now().naive_utc(),
227 rx,
228 }
229 }
230 async fn try_advance(
231 &mut self,
232 block_history: &BlockHistory,
233 block_time: std::time::Duration,
234 latency_buffer: std::time::Duration,
235 stale_threshold: std::time::Duration,
236 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
237 let extractor_id = self.extractor_id.clone();
238 let latest_block = block_history.latest();
239
240 match &self.state {
241 SynchronizerState::Started | SynchronizerState::Ended(_) => {
242 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
243 Ok(None)
244 }
245 SynchronizerState::Advanced(b) => {
246 let future_block = b.clone();
247 self.transition(future_block, block_history, stale_threshold)?;
249 Ok(None)
250 }
251 SynchronizerState::Ready(previous_block) => {
252 self.try_recv_next_expected(
254 block_time + latency_buffer,
255 block_history,
256 previous_block.clone(),
257 stale_threshold,
258 )
259 .await
260 }
261 SynchronizerState::Delayed(old_block) => {
262 debug!(
264 %old_block,
265 latest_block=opt(&latest_block),
266 %extractor_id,
267 "Trying to catch up to latest block"
268 );
269 self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
270 .await
271 }
272 SynchronizerState::Stale(old_block) => {
273 debug!(
275 %old_block,
276 latest_block=opt(&latest_block),
277 %extractor_id,
278 "Trying to catch up stale synchronizer to latest block"
279 );
280 self.try_catch_up(block_history, block_time, stale_threshold)
281 .await
282 }
283 }
284 }
285
286 async fn try_recv_next_expected(
294 &mut self,
295 max_wait: std::time::Duration,
296 block_history: &BlockHistory,
297 previous_block: BlockHeader,
298 stale_threshold: std::time::Duration,
299 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
300 let extractor_id = self.extractor_id.clone();
301 match timeout(max_wait, self.rx.recv()).await {
302 Ok(Some(Ok(msg))) => {
303 self.transition(msg.header.clone(), block_history, stale_threshold)?;
304 Ok(Some(msg))
305 }
306 Ok(Some(Err(e))) => {
307 self.mark_errored(e);
309 Ok(None)
310 }
311 Ok(None) => {
312 warn!(
315 %extractor_id,
316 "Tried to poll from closed synchronizer.",
317 );
318 self.mark_closed();
319 Ok(None)
320 }
321 Err(_) => {
322 debug!(%extractor_id, %previous_block, "No block received within time limit.");
324 self.state = SynchronizerState::Delayed(previous_block.clone());
327 self.modify_ts = Local::now().naive_utc();
328 Ok(None)
329 }
330 }
331 }
332
333 async fn try_catch_up(
339 &mut self,
340 block_history: &BlockHistory,
341 max_wait: std::time::Duration,
342 stale_threshold: std::time::Duration,
343 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
344 let mut results = Vec::new();
345 let extractor_id = self.extractor_id.clone();
346
347 let deadline = std::time::Instant::now() + max_wait;
349
350 while std::time::Instant::now() < deadline {
351 match timeout(
352 deadline.saturating_duration_since(std::time::Instant::now()),
353 self.rx.recv(),
354 )
355 .await
356 {
357 Ok(Some(Ok(msg))) => {
358 debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
359 let block_pos = block_history.determine_block_position(&msg.header)?;
360 results.push(msg);
361 if matches!(block_pos, BlockPosition::NextExpected) {
362 break;
363 }
364 }
365 Ok(Some(Err(e))) => {
366 self.mark_errored(e);
368 return Ok(None);
369 }
370 Ok(None) => {
371 warn!(
374 %extractor_id,
375 "Tried to poll from closed synchronizer during catch up.",
376 );
377 self.mark_closed();
378 return Ok(None);
379 }
380 Err(_) => {
381 debug!(%extractor_id, "Timed out waiting for catch-up");
382 break;
383 }
384 }
385 }
386
387 let merged = results
388 .into_iter()
389 .reduce(|l, r| l.merge(r));
390
391 if let Some(msg) = merged {
392 debug!(%extractor_id, "Delayed extractor made progress!");
394 self.transition(msg.header.clone(), block_history, stale_threshold)?;
395 Ok(Some(msg))
396 } else {
397 self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
399 Ok(None)
400 }
401 }
402
403 fn check_and_transition_to_stale_if_needed(
405 &mut self,
406 stale_threshold: std::time::Duration,
407 fallback_header: Option<BlockHeader>,
408 ) -> Result<bool, BlockSynchronizerError> {
409 let now = Local::now().naive_utc();
410 let wait_duration = now.signed_duration_since(self.modify_ts);
411 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
412 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
413
414 if wait_duration > stale_threshold_chrono {
415 let header_to_use = match (&self.state, fallback_header) {
416 (SynchronizerState::Ready(h), _) |
417 (SynchronizerState::Delayed(h), _) |
418 (SynchronizerState::Stale(h), _) => h.clone(),
419 (_, Some(h)) => h,
420 _ => BlockHeader::default(),
421 };
422
423 warn!(
424 extractor_id=%self.extractor_id,
425 last_message_at=?self.modify_ts,
426 "SynchronizerStream transition to stale due to timeout."
427 );
428 self.state = SynchronizerState::Stale(header_to_use);
429 self.modify_ts = now;
430 Ok(true)
431 } else {
432 Ok(false)
433 }
434 }
435
436 fn transition(
443 &mut self,
444 latest_retrieved: BlockHeader,
445 block_history: &BlockHistory,
446 stale_threshold: std::time::Duration,
447 ) -> Result<(), BlockSynchronizerError> {
448 let extractor_id = self.extractor_id.clone();
449 let last_message_at = self.modify_ts;
450 let block = &latest_retrieved;
451
452 match block_history.determine_block_position(&latest_retrieved)? {
453 BlockPosition::NextExpected => {
454 self.state = SynchronizerState::Ready(latest_retrieved.clone());
455 trace!(
456 next = %latest_retrieved,
457 extractor = %extractor_id,
458 "SynchronizerStream transition to next expected"
459 )
460 }
461 BlockPosition::Latest | BlockPosition::Delayed => {
462 if !self.check_and_transition_to_stale_if_needed(
463 stale_threshold,
464 Some(latest_retrieved.clone()),
465 )? {
466 warn!(
467 %extractor_id,
468 ?last_message_at,
469 %block,
470 "SynchronizerStream transition transition to delayed."
471 );
472 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
473 }
474 }
475 BlockPosition::Advanced => {
476 info!(
477 %extractor_id,
478 ?last_message_at,
479 latest = opt(&block_history.latest()),
480 %block,
481 "SynchronizerStream transition to advanced."
482 );
483 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
484 }
485 }
486 self.modify_ts = Local::now().naive_utc();
487 Ok(())
488 }
489
490 fn mark_errored(&mut self, error: SynchronizerError) {
495 self.state = SynchronizerState::Ended(error.to_string());
496 self.modify_ts = Local::now().naive_utc();
497 self.error = Some(error);
498 }
499
500 fn mark_closed(&mut self) {
506 if !matches!(self.state, SynchronizerState::Ended(_)) {
507 self.state = SynchronizerState::Ended("Closed".to_string());
508 self.modify_ts = Local::now().naive_utc();
509 }
510 }
511
512 fn mark_stale(&mut self, header: &BlockHeader) {
514 self.state = SynchronizerState::Stale(header.clone());
515 self.modify_ts = Local::now().naive_utc();
516 }
517
518 fn mark_ready(&mut self, header: &BlockHeader) {
520 self.state = SynchronizerState::Ready(header.clone());
521 self.modify_ts = Local::now().naive_utc();
522 }
523
524 fn has_ended(&self) -> bool {
525 matches!(self.state, SynchronizerState::Ended(_))
526 }
527
528 fn is_stale(&self) -> bool {
529 matches!(self.state, SynchronizerState::Stale(_))
530 }
531
532 fn is_advanced(&self) -> bool {
533 matches!(self.state, SynchronizerState::Advanced(_))
534 }
535
536 fn get_current_header(&self) -> Option<&BlockHeader> {
540 match &self.state {
541 SynchronizerState::Ready(b) |
542 SynchronizerState::Delayed(b) |
543 SynchronizerState::Advanced(b) => Some(b),
544 _ => None,
545 }
546 }
547}
548
549#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
550pub struct FeedMessage<H = BlockHeader>
551where
552 H: HeaderLike,
553{
554 pub state_msgs: HashMap<String, StateSyncMessage<H>>,
555 pub sync_states: HashMap<String, SynchronizerState>,
556}
557
558impl<H> FeedMessage<H>
559where
560 H: HeaderLike,
561{
562 fn new(
563 state_msgs: HashMap<String, StateSyncMessage<H>>,
564 sync_states: HashMap<String, SynchronizerState>,
565 ) -> Self {
566 Self { state_msgs, sync_states }
567 }
568}
569
570impl<S> BlockSynchronizer<S>
571where
572 S: StateSynchronizer,
573{
574 pub fn new(
575 block_time: std::time::Duration,
576 latency_buffer: std::time::Duration,
577 max_missed_blocks: u64,
578 ) -> Self {
579 Self {
580 synchronizers: None,
581 max_messages: None,
582 block_time,
583 latency_buffer,
584 startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
585 max_missed_blocks,
586 }
587 }
588
589 pub fn max_messages(&mut self, val: usize) {
595 self.max_messages = Some(val);
596 }
597
598 pub fn startup_timeout(mut self, val: Duration) {
602 self.startup_timeout = val;
603 }
604
605 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
606 let mut registered = self.synchronizers.unwrap_or_default();
607 registered.insert(id, synchronizer);
608 self.synchronizers = Some(registered);
609 self
610 }
611
612 #[cfg(test)]
613 pub fn with_short_timeouts() -> Self {
614 Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
615 }
616
617 async fn cleanup_synchronizers(
620 mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
621 sync_close_senders: Vec<oneshot::Sender<()>>,
622 ) {
623 for close_sender in sync_close_senders {
625 let _ = close_sender.send(());
626 }
627
628 let mut completed_tasks = 0;
630 while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
631 completed_tasks += 1;
632 }
633
634 let remaining_tasks = state_sync_tasks.len();
636 if remaining_tasks > 0 {
637 warn!(
638 completed = completed_tasks,
639 timed_out = remaining_tasks,
640 "Some synchronizers timed out during cleanup and may not have shut down cleanly"
641 );
642 }
643 }
644
645 pub async fn run(
650 mut self,
651 ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
652 {
653 trace!("Starting BlockSynchronizer...");
654 let state_sync_tasks = FuturesUnordered::new();
655 let mut synchronizers = self
656 .synchronizers
657 .take()
658 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
659 let init_tasks = synchronizers
661 .values_mut()
662 .map(|s| s.initialize())
663 .collect::<Vec<_>>();
664 try_join_all(init_tasks).await?;
665
666 let mut sync_streams = Vec::with_capacity(synchronizers.len());
667 let mut sync_close_senders = Vec::new();
668 for (extractor_id, synchronizer) in synchronizers.drain() {
669 let (handle, rx) = synchronizer.start().await;
670 let (join_handle, close_sender) = handle.split();
671 state_sync_tasks.push(join_handle);
672 sync_close_senders.push(close_sender);
673
674 sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
675 }
676
677 debug!("Waiting for initial synchronizer messages...");
680 let mut startup_futures = Vec::new();
681 for synchronizer in sync_streams.iter_mut() {
682 let fut = async {
683 let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
684 (synchronizer, res)
685 };
686 startup_futures.push(fut);
687 }
688
689 let mut ready_sync_msgs = HashMap::new();
690 let initial_headers = join_all(startup_futures)
691 .await
692 .into_iter()
693 .filter_map(|(synchronizer, res)| {
694 let extractor_id = synchronizer.extractor_id.clone();
695 match res {
696 Ok(Some(Ok(msg))) => {
697 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
698 synchronizer.mark_ready(&msg.header);
700 let header = msg.header.clone();
701 ready_sync_msgs.insert(extractor_id.name.clone(), msg);
702 Some(header)
703 }
704 Ok(Some(Err(e))) => {
705 synchronizer.mark_errored(e);
706 None
707 }
708 Ok(None) => {
709 warn!(%extractor_id, "Synchronizer closed during startup");
714 synchronizer.mark_closed();
715 None
716 }
717 Err(_) => {
718 warn!(%extractor_id, "Timed out waiting for first message");
720 synchronizer.mark_stale(&BlockHeader::default());
721 None
722 }
723 }
724 })
725 .collect::<HashSet<_>>() .into_iter()
727 .collect::<Vec<_>>();
728
729 Self::check_streams(&sync_streams)?;
731 let mut block_history = BlockHistory::new(initial_headers, 15)?;
732 let start_header = block_history
734 .latest()
735 .ok_or(BlockHistoryError::EmptyHistory)?;
737 info!(
738 start_block=%start_header,
739 n_healthy=ready_sync_msgs.len(),
740 n_total=sync_streams.len(),
741 "Block synchronization started successfully!"
742 );
743
744 for stream in sync_streams.iter_mut() {
747 if let SynchronizerState::Ready(header) = stream.state.clone() {
748 if header.number < start_header.number {
749 debug!(
750 extractor_id=%stream.extractor_id,
751 synchronizer_block=header.number,
752 current_block=start_header.number,
753 "Marking synchronizer as delayed during initialization"
754 );
755 stream.state = SynchronizerState::Delayed(header);
756 }
757 }
758 }
759
760 let (sync_tx, sync_rx) = mpsc::channel(30);
761 let main_loop_jh = tokio::spawn(async move {
762 let mut n_iter = 1;
763 loop {
764 let msg = FeedMessage::new(
766 std::mem::take(&mut ready_sync_msgs),
767 sync_streams
768 .iter()
769 .map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
770 .collect(),
771 );
772 if sync_tx.send(Ok(msg)).await.is_err() {
773 info!("Receiver closed, block synchronizer terminating..");
774 return;
775 };
776
777 if let Some(max_messages) = self.max_messages {
779 if n_iter >= max_messages {
780 info!(max_messages, "StreamEnd");
781 return;
782 }
783 }
784 n_iter += 1;
785
786 let res = self
787 .handle_next_message(
788 &mut sync_streams,
789 &mut ready_sync_msgs,
790 &mut block_history,
791 )
792 .await;
793
794 if let Err(e) = res {
795 let _ = sync_tx.send(Err(e)).await;
797 return;
798 }
799 }
800 });
801
802 let nanny_jh = tokio::spawn(async move {
807 let _ = main_loop_jh.await.map_err(|e| {
809 if e.is_panic() {
810 error!("BlockSynchornizer main loop panicked: {e}")
811 }
812 });
813 debug!("Main loop exited. Closing synchronizers");
814 Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
815 debug!("Shutdown complete");
816 });
817 Ok((nanny_jh, sync_rx))
818 }
819
820 async fn handle_next_message(
825 &self,
826 sync_streams: &mut [SynchronizerStream],
827 ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
828 block_history: &mut BlockHistory,
829 ) -> BlockSyncResult<()> {
830 let mut recv_futures = Vec::new();
831 for stream in sync_streams.iter_mut() {
832 if stream.has_ended() {
835 continue;
836 }
837 recv_futures.push(async {
847 let res = stream
848 .try_advance(
849 block_history,
850 self.block_time,
851 self.latency_buffer,
852 self.block_time
853 .mul_f64(self.max_missed_blocks as f64),
854 )
855 .await?;
856 Ok::<_, BlockSynchronizerError>(
857 res.map(|msg| (stream.extractor_id.name.clone(), msg)),
858 )
859 });
860 }
861 ready_sync_msgs.extend(
862 join_all(recv_futures)
863 .await
864 .into_iter()
865 .collect::<Result<Vec<_>, _>>()?
866 .into_iter()
867 .flatten(),
868 );
869
870 Self::check_streams(sync_streams)?;
873
874 if sync_streams
877 .iter()
878 .any(SynchronizerStream::is_advanced)
879 {
880 *block_history = Self::reinit_block_history(sync_streams, block_history)?;
881 } else {
882 let header = sync_streams
883 .iter()
884 .filter_map(SynchronizerStream::get_current_header)
885 .max_by_key(|b| b.number)
886 .ok_or(BlockSynchronizerError::NoReadySynchronizers(
888 "Expected to have at least one synchronizer that is not stale or ended"
889 .to_string(),
890 ))?;
891 block_history.push(header.clone())?;
892 }
893 Ok(())
894 }
895
896 fn reinit_block_history(
905 sync_streams: &mut [SynchronizerStream],
906 block_history: &mut BlockHistory,
907 ) -> Result<BlockHistory, BlockSynchronizerError> {
908 let previous = block_history
909 .latest()
910 .ok_or(BlockHistoryError::EmptyHistory)?;
912 let blocks = sync_streams
913 .iter()
914 .filter_map(SynchronizerStream::get_current_header)
915 .cloned()
916 .collect();
917 let new_block_history = BlockHistory::new(blocks, 10)?;
918 let latest = new_block_history
919 .latest()
920 .ok_or(BlockHistoryError::EmptyHistory)?;
922 info!(
923 %previous,
924 %latest,
925 "Advanced synchronizer detected. Reinitialized block history."
926 );
927 sync_streams
928 .iter_mut()
929 .for_each(|stream| {
930 if let Some(header) = stream.get_current_header() {
933 if header.number < latest.number {
934 stream.state = SynchronizerState::Delayed(header.clone());
935 } else if header.number == latest.number {
936 stream.state = SynchronizerState::Ready(header.clone());
937 }
938 }
939 });
940 Ok(new_block_history)
941 }
942
943 fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
948 let mut latest_errored_stream: Option<&SynchronizerStream> = None;
949
950 for stream in sync_streams.iter() {
951 if !stream.has_ended() && !stream.is_stale() {
953 return Ok(());
954 }
955
956 if latest_errored_stream.is_none() ||
958 stream.modify_ts >
959 latest_errored_stream
960 .as_ref()
961 .unwrap()
962 .modify_ts
963 {
964 latest_errored_stream = Some(stream);
965 }
966 }
967
968 let last_error_reason = if let Some(stream) = latest_errored_stream {
969 if let Some(err) = &stream.error {
970 format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
971 } else {
972 format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
973 }
974 } else {
975 return Err(BlockSynchronizerError::NoSynchronizers);
976 };
977
978 let mut reason = vec![last_error_reason];
979
980 sync_streams.iter().for_each(|stream| {
981 reason.push(format!(
982 "{} reported as {} at {}",
983 stream.extractor_id, stream.state, stream.modify_ts
984 ))
985 });
986
987 Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use std::sync::Arc;
994
995 use async_trait::async_trait;
996 use test_log::test;
997 use tokio::sync::{oneshot, Mutex};
998 use tycho_common::dto::Chain;
999
1000 use super::*;
1001 use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
1002
1003 #[derive(Clone, Debug)]
1004 enum MockBehavior {
1005 Normal, IgnoreClose, ExitImmediately, }
1009
1010 type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1011
1012 #[derive(Clone)]
1013 struct MockStateSync {
1014 header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1015 header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1016 close_received: Arc<Mutex<bool>>,
1017 behavior: MockBehavior,
1018 close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1020 }
1021
1022 impl MockStateSync {
1023 fn new() -> Self {
1024 Self::with_behavior(MockBehavior::Normal)
1025 }
1026
1027 fn with_behavior(behavior: MockBehavior) -> Self {
1028 let (tx, rx) = mpsc::channel(1);
1029 Self {
1030 header_tx: tx,
1031 header_rx: Arc::new(Mutex::new(Some(rx))),
1032 close_received: Arc::new(Mutex::new(false)),
1033 behavior,
1034 close_tx: Arc::new(Mutex::new(None)),
1035 }
1036 }
1037
1038 async fn was_close_received(&self) -> bool {
1039 *self.close_received.lock().await
1040 }
1041
1042 async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1043 self.header_tx
1044 .send(Ok(header))
1045 .await
1046 .map_err(|e| format!("sending header failed: {e}"))
1047 }
1048
1049 async fn trigger_close(&self) {
1051 if let Some(close_tx) = self.close_tx.lock().await.take() {
1052 let _ = close_tx.send(());
1053 }
1054 }
1055 }
1056
1057 #[async_trait]
1058 impl StateSynchronizer for MockStateSync {
1059 async fn initialize(&mut self) -> SyncResult<()> {
1060 Ok(())
1061 }
1062
1063 async fn start(
1064 mut self,
1065 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1066 let block_rx = {
1067 let mut guard = self.header_rx.lock().await;
1068 guard
1069 .take()
1070 .expect("Block receiver was not set!")
1071 };
1072
1073 let (close_tx_for_handle, close_rx) = oneshot::channel();
1076 let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1077
1078 {
1080 let mut guard = self.close_tx.lock().await;
1081 *guard = Some(close_tx_for_test);
1082 }
1083
1084 let behavior = self.behavior.clone();
1085 let close_received_clone = self.close_received.clone();
1086 let tx = self.header_tx.clone();
1087
1088 let jh = tokio::spawn(async move {
1089 match behavior {
1090 MockBehavior::IgnoreClose => {
1091 loop {
1094 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1095 }
1096 }
1097 MockBehavior::ExitImmediately => {
1098 tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1100 "Simulated immediate task failure".to_string(),
1101 )))
1102 .await
1103 .unwrap();
1104 }
1105 MockBehavior::Normal => {
1106 let _ = tokio::select! {
1109 result = close_rx => result,
1110 result = close_rx_for_test => result,
1111 };
1112 let mut guard = close_received_clone.lock().await;
1113 *guard = true;
1114 }
1115 }
1116 });
1117
1118 let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1119 (handle, block_rx)
1120 }
1121 }
1122
1123 fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1124 StateSyncMessage {
1125 header: BlockHeader {
1126 number: block as u64,
1127 hash: Bytes::from(vec![block]),
1128 parent_hash: Bytes::from(vec![block - 1]),
1129 revert: false,
1130 timestamp: 1000,
1131 },
1132 ..Default::default()
1133 }
1134 }
1135
1136 async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1137 timeout(Duration::from_millis(100), rx.recv())
1138 .await
1139 .expect("Responds in time")
1140 .expect("Should receive first message")
1141 .expect("No error")
1142 }
1143
1144 async fn setup_block_sync(
1145 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1146 {
1147 setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1148 }
1149
1150 async fn setup_block_sync_with_behaviour(
1152 v2_behavior: MockBehavior,
1153 v3_behavior: MockBehavior,
1154 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1155 {
1156 let v2_sync = MockStateSync::with_behavior(v2_behavior);
1157 let v3_sync = MockStateSync::with_behavior(v3_behavior);
1158
1159 let mut block_sync = BlockSynchronizer::new(
1161 Duration::from_millis(20), Duration::from_millis(10), 3, );
1165 block_sync.max_messages(10); let block_sync = block_sync
1168 .register_synchronizer(
1169 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1170 v2_sync.clone(),
1171 )
1172 .register_synchronizer(
1173 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1174 v3_sync.clone(),
1175 );
1176
1177 let block1_msg = header_message(1);
1179 let _ = v2_sync
1180 .send_header(block1_msg.clone())
1181 .await;
1182 let _ = v3_sync
1183 .send_header(block1_msg.clone())
1184 .await;
1185
1186 let (nanny_handle, mut rx) = block_sync
1188 .run()
1189 .await
1190 .expect("BlockSynchronizer failed to start");
1191
1192 let first_feed_msg = receive_message(&mut rx).await;
1193 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1194 assert!(matches!(
1195 first_feed_msg
1196 .sync_states
1197 .get("uniswap-v2")
1198 .unwrap(),
1199 SynchronizerState::Ready(_)
1200 ));
1201 assert!(matches!(
1202 first_feed_msg
1203 .sync_states
1204 .get("uniswap-v3")
1205 .unwrap(),
1206 SynchronizerState::Ready(_)
1207 ));
1208
1209 (v2_sync, v3_sync, nanny_handle, rx)
1210 }
1211
1212 async fn shutdown_block_synchronizer(
1213 v2_sync: &MockStateSync,
1214 v3_sync: &MockStateSync,
1215 nanny_handle: JoinHandle<()>,
1216 ) {
1217 v3_sync.trigger_close().await;
1218 v2_sync.trigger_close().await;
1219
1220 timeout(Duration::from_millis(100), nanny_handle)
1221 .await
1222 .expect("Nanny failed to exit within time")
1223 .expect("Nanny panicked");
1224 }
1225
1226 #[test(tokio::test)]
1227 async fn test_two_ready_synchronizers() {
1228 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1229
1230 let second_msg = header_message(2);
1231 v2_sync
1232 .send_header(second_msg.clone())
1233 .await
1234 .expect("send_header failed");
1235 v3_sync
1236 .send_header(second_msg.clone())
1237 .await
1238 .expect("send_header failed");
1239 let second_feed_msg = receive_message(&mut rx).await;
1240
1241 let exp2 = FeedMessage {
1242 state_msgs: [
1243 ("uniswap-v2".to_string(), second_msg.clone()),
1244 ("uniswap-v3".to_string(), second_msg.clone()),
1245 ]
1246 .into_iter()
1247 .collect(),
1248 sync_states: [
1249 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1250 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1251 ]
1252 .into_iter()
1253 .collect(),
1254 };
1255 assert_eq!(second_feed_msg, exp2);
1256
1257 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1258 }
1259
1260 #[test(tokio::test)]
1261 async fn test_delayed_synchronizer_catches_up() {
1262 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1263
1264 let block2_msg = header_message(2);
1266 v2_sync
1267 .send_header(block2_msg.clone())
1268 .await
1269 .expect("send_header failed");
1270
1271 let second_feed_msg = receive_message(&mut rx).await;
1273 debug!("Consumed second message for v2");
1274
1275 assert!(second_feed_msg
1276 .state_msgs
1277 .contains_key("uniswap-v2"));
1278 assert!(matches!(
1279 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1280 SynchronizerState::Ready(header) if header.number == 2
1281 ));
1282 assert!(!second_feed_msg
1283 .state_msgs
1284 .contains_key("uniswap-v3"));
1285 assert!(matches!(
1286 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1287 SynchronizerState::Delayed(header) if header.number == 1
1288 ));
1289
1290 v3_sync
1292 .send_header(block2_msg.clone())
1293 .await
1294 .expect("send_header failed");
1295
1296 let block3_msg = header_message(3);
1298 v2_sync
1299 .send_header(block3_msg.clone())
1300 .await
1301 .expect("send_header failed");
1302 v3_sync
1303 .send_header(block3_msg)
1304 .await
1305 .expect("send_header failed");
1306
1307 let mut third_feed_msg = receive_message(&mut rx).await;
1310
1311 if !third_feed_msg
1314 .state_msgs
1315 .contains_key("uniswap-v2")
1316 {
1317 third_feed_msg = rx
1318 .recv()
1319 .await
1320 .expect("header channel was closed")
1321 .expect("no error");
1322 }
1323 assert!(third_feed_msg
1324 .state_msgs
1325 .contains_key("uniswap-v2"));
1326 assert!(third_feed_msg
1327 .state_msgs
1328 .contains_key("uniswap-v3"));
1329 assert!(matches!(
1330 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1331 SynchronizerState::Ready(header) if header.number == 3
1332 ));
1333 assert!(matches!(
1334 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1335 SynchronizerState::Ready(header) if header.number == 3
1336 ));
1337
1338 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1339 }
1340
1341 #[test(tokio::test)]
1342 async fn test_different_start_blocks() {
1343 let v2_sync = MockStateSync::new();
1344 let v3_sync = MockStateSync::new();
1345 let block_sync = BlockSynchronizer::with_short_timeouts()
1346 .register_synchronizer(
1347 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1348 v2_sync.clone(),
1349 )
1350 .register_synchronizer(
1351 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1352 v3_sync.clone(),
1353 );
1354
1355 let block1_msg = header_message(1);
1357 let block2_msg = header_message(2);
1358
1359 let _ = v2_sync
1360 .send_header(block1_msg.clone())
1361 .await;
1362 v3_sync
1363 .send_header(block2_msg.clone())
1364 .await
1365 .expect("send_header failed");
1366
1367 let (jh, mut rx) = block_sync
1369 .run()
1370 .await
1371 .expect("BlockSynchronizer failed to start.");
1372
1373 let first_feed_msg = receive_message(&mut rx).await;
1375 assert!(matches!(
1376 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1377 SynchronizerState::Delayed(header) if header.number == 1
1378 ));
1379 assert!(matches!(
1380 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1381 SynchronizerState::Ready(header) if header.number == 2
1382 ));
1383
1384 v2_sync
1386 .send_header(block2_msg.clone())
1387 .await
1388 .expect("send_header failed");
1389
1390 let block3_msg = header_message(3);
1392 let _ = v2_sync
1393 .send_header(block3_msg.clone())
1394 .await;
1395 v3_sync
1396 .send_header(block3_msg.clone())
1397 .await
1398 .expect("send_header failed");
1399
1400 let second_feed_msg = receive_message(&mut rx).await;
1402 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1403 assert!(matches!(
1404 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1405 SynchronizerState::Ready(header) if header.number == 3
1406 ));
1407 assert!(matches!(
1408 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1409 SynchronizerState::Ready(header) if header.number == 3
1410 ));
1411
1412 shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
1413 }
1414
1415 #[test(tokio::test)]
1416 async fn test_synchronizer_fails_other_goes_stale() {
1417 let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1418 setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1419 .await;
1420
1421 let mut error_reported = false;
1422 for _ in 0..3 {
1423 if let Some(msg) = sync_rx.recv().await {
1424 match msg {
1425 Err(_) => error_reported = true,
1426 Ok(msg) => {
1427 assert!(matches!(
1428 msg.sync_states
1429 .get("uniswap-v3")
1430 .unwrap(),
1431 SynchronizerState::Delayed(_)
1432 ));
1433 assert!(matches!(
1434 msg.sync_states
1435 .get("uniswap-v2")
1436 .unwrap(),
1437 SynchronizerState::Ended(_)
1438 ));
1439 }
1440 }
1441 }
1442 }
1443 assert!(error_reported, "BlockSynchronizer did not report final error");
1444
1445 let result = timeout(Duration::from_secs(2), nanny_handle).await;
1447 assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1448
1449 assert!(
1451 v3_sync.was_close_received().await,
1452 "v3_sync should have received close signal during cleanup"
1453 );
1454 }
1455
1456 #[test(tokio::test)]
1457 async fn test_cleanup_timeout_warning() {
1458 let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1461 MockBehavior::ExitImmediately,
1462 MockBehavior::IgnoreClose,
1463 )
1464 .await;
1465
1466 let result = timeout(Duration::from_secs(10), nanny_handle).await;
1468 assert!(
1469 result.is_ok(),
1470 "Nanny should complete even when some synchronizers timeout during cleanup"
1471 );
1472
1473 }
1477
1478 #[test(tokio::test)]
1479 async fn test_one_synchronizer_goes_stale_while_other_works() {
1480 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1482
1483 let block2_msg = header_message(2);
1485 let _ = v3_sync
1486 .send_header(block2_msg.clone())
1487 .await;
1488 let second_feed_msg = receive_message(&mut rx).await;
1492 assert!(second_feed_msg
1493 .state_msgs
1494 .contains_key("uniswap-v3"));
1495 assert!(!second_feed_msg
1496 .state_msgs
1497 .contains_key("uniswap-v2"));
1498 assert!(matches!(
1499 second_feed_msg
1500 .sync_states
1501 .get("uniswap-v3")
1502 .unwrap(),
1503 SynchronizerState::Ready(_)
1504 ));
1505 if let Some(v2_state) = second_feed_msg
1507 .sync_states
1508 .get("uniswap-v2")
1509 {
1510 if matches!(v2_state, SynchronizerState::Delayed(_)) {
1511 assert!(
1513 !nanny_handle.is_finished(),
1514 "Nanny should still be running when synchronizer is delayed (not stale yet)"
1515 );
1516 }
1517 }
1518
1519 tokio::time::sleep(Duration::from_millis(15)).await;
1521
1522 let block3_msg = header_message(3);
1524 let _ = v3_sync
1525 .send_header(block3_msg.clone())
1526 .await;
1527
1528 tokio::time::sleep(Duration::from_millis(40)).await;
1529
1530 let mut stale_found = false;
1531 for _ in 0..2 {
1532 if let Some(Ok(msg)) = rx.recv().await {
1533 if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1534 stale_found = true;
1535 }
1536 }
1537 }
1538 assert!(stale_found, "v2 synchronizer should be stale");
1539
1540 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1541 }
1542
1543 #[test(tokio::test)]
1544 async fn test_all_synchronizers_go_stale_main_loop_exits() {
1545 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1547
1548 let mut seen_delayed = false;
1553
1554 let timeout_duration = Duration::from_millis(500); let start_time = tokio::time::Instant::now();
1558
1559 while let Ok(Some(Ok(msg))) =
1560 tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1561 {
1562 if !seen_delayed {
1564 let v2_state = msg.sync_states.get("uniswap-v2");
1565 let v3_state = msg.sync_states.get("uniswap-v3");
1566
1567 if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1568 matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1569 {
1570 seen_delayed = true;
1571 assert!(!nanny_handle.is_finished(),
1573 "Nanny should still be running when synchronizers are delayed (not stale yet)");
1574 break;
1576 }
1577 }
1578
1579 if start_time.elapsed() > timeout_duration {
1581 break;
1582 }
1583 }
1584 assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1586
1587 let mut error_reported = false;
1588 while let Some(msg) = rx.recv().await {
1590 if let Err(e) = msg {
1591 assert!(e
1592 .to_string()
1593 .contains("became: Stale(1)"));
1594 assert!(e
1595 .to_string()
1596 .contains("reported as Stale(1)"));
1597 error_reported = true;
1598 }
1599 }
1600 assert!(error_reported, "Expected the channel to report an error before closing");
1601
1602 let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1604 assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1605
1606 assert!(
1608 v2_sync.was_close_received().await,
1609 "v2_sync should have received close signal during cleanup"
1610 );
1611 assert!(
1612 v3_sync.was_close_received().await,
1613 "v3_sync should have received close signal during cleanup"
1614 );
1615 }
1616
1617 #[test(tokio::test)]
1618 async fn test_stale_synchronizer_recovers() {
1619 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1621
1622 tokio::time::sleep(Duration::from_millis(50)).await;
1624 let block2_msg = header_message(2);
1625 let _ = v2_sync
1626 .send_header(block2_msg.clone())
1627 .await;
1628
1629 for _ in 0..2 {
1631 if let Some(msg) = rx.recv().await {
1632 if let Ok(msg) = msg {
1633 if matches!(
1634 msg.sync_states
1635 .get("uniswap-v2")
1636 .unwrap(),
1637 SynchronizerState::Ready(_)
1638 ) {
1639 assert!(matches!(
1640 msg.sync_states
1641 .get("uniswap-v3")
1642 .unwrap(),
1643 SynchronizerState::Delayed(_)
1644 ));
1645 break;
1646 };
1647 }
1648 } else {
1649 panic!("Channel closed unexpectedly")
1650 }
1651 }
1652
1653 tokio::time::sleep(Duration::from_millis(15)).await;
1655 let block3_msg = header_message(3);
1656 let _ = v2_sync
1657 .send_header(block3_msg.clone())
1658 .await;
1659 let third_msg = receive_message(&mut rx).await;
1660 dbg!(&third_msg);
1661 assert!(matches!(
1662 third_msg
1663 .sync_states
1664 .get("uniswap-v2")
1665 .unwrap(),
1666 SynchronizerState::Ready(_)
1667 ));
1668 assert!(matches!(
1669 third_msg
1670 .sync_states
1671 .get("uniswap-v3")
1672 .unwrap(),
1673 SynchronizerState::Stale(_)
1674 ));
1675
1676 let block4_msg = header_message(4);
1677 let _ = v3_sync
1678 .send_header(block2_msg.clone())
1679 .await;
1680 let _ = v3_sync
1681 .send_header(block3_msg.clone())
1682 .await;
1683 let _ = v3_sync
1684 .send_header(block4_msg.clone())
1685 .await;
1686 let _ = v2_sync
1687 .send_header(block4_msg.clone())
1688 .await;
1689 let fourth_msg = receive_message(&mut rx).await;
1690 assert!(matches!(
1691 fourth_msg
1692 .sync_states
1693 .get("uniswap-v2")
1694 .unwrap(),
1695 SynchronizerState::Ready(_)
1696 ));
1697 assert!(matches!(
1698 fourth_msg
1699 .sync_states
1700 .get("uniswap-v3")
1701 .unwrap(),
1702 SynchronizerState::Ready(_)
1703 ));
1704
1705 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1706
1707 assert!(
1709 v2_sync.was_close_received().await,
1710 "v2_sync should have received close signal during cleanup"
1711 );
1712 assert!(
1713 v3_sync.was_close_received().await,
1714 "v3_sync should have received close signal during cleanup"
1715 );
1716 }
1717
1718 #[test(tokio::test)]
1719 async fn test_all_synchronizer_advanced() {
1720 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1724
1725 let block3 = header_message(3);
1726 v2_sync
1727 .send_header(block3.clone())
1728 .await
1729 .unwrap();
1730 v3_sync
1731 .send_header(block3)
1732 .await
1733 .unwrap();
1734
1735 let msg = receive_message(&mut rx).await;
1736 matches!(
1737 msg.sync_states
1738 .get("uniswap-v2")
1739 .unwrap(),
1740 SynchronizerState::Ready(_)
1741 );
1742 matches!(
1743 msg.sync_states
1744 .get("uniswap-v3")
1745 .unwrap(),
1746 SynchronizerState::Ready(_)
1747 );
1748
1749 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1750 }
1751
1752 #[test(tokio::test)]
1753 async fn test_one_synchronizer_advanced() {
1754 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1755
1756 let block2 = header_message(2);
1757 let block4 = header_message(4);
1758 v2_sync
1759 .send_header(block4.clone())
1760 .await
1761 .unwrap();
1762 v3_sync
1763 .send_header(block2.clone())
1764 .await
1765 .unwrap();
1766
1767 let msg = receive_message(&mut rx).await;
1768 matches!(
1769 msg.sync_states
1770 .get("uniswap-v2")
1771 .unwrap(),
1772 SynchronizerState::Ready(_)
1773 );
1774 matches!(
1775 msg.sync_states
1776 .get("uniswap-v3")
1777 .unwrap(),
1778 SynchronizerState::Delayed(_)
1779 );
1780
1781 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1782 }
1783}