1use std::{
2 collections::{HashMap, HashSet},
3 fmt::{Display, Formatter},
4 time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{
9 future::{join_all, try_join_all},
10 stream::FuturesUnordered,
11 StreamExt,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::{
16 sync::{
17 mpsc::{self, Receiver},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25 display::opt,
26 dto::{BlockChanges, ExtractorIdentity},
27 Bytes,
28};
29
30use crate::feed::{
31 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
32 synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
33};
34
35mod block_history;
36pub mod component_tracker;
37pub mod synchronizer;
38
39pub trait HeaderLike {
44 fn block(self) -> Option<BlockHeader>;
45 fn block_number_or_timestamp(self) -> u64;
46}
47
48#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
49pub struct BlockHeader {
50 pub hash: Bytes,
51 pub number: u64,
52 pub parent_hash: Bytes,
53 pub revert: bool,
54 pub timestamp: u64,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub partial_block_index: Option<u32>,
58}
59
60impl BlockHeader {
61 fn is_partial(&self) -> bool {
62 self.partial_block_index.is_some()
63 }
64}
65
66impl Display for BlockHeader {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 let short_hash = if self.hash.len() >= 4 {
70 hex::encode(&self.hash[..4]) } else {
72 hex::encode(&self.hash)
73 };
74
75 match self.partial_block_index {
76 Some(idx) => write!(f, "Block #{} [0x{}..] (partial {})", self.number, short_hash, idx),
77 None => write!(f, "Block #{} [0x{}..]", self.number, short_hash),
78 }
79 }
80}
81
82impl From<&BlockChanges> for BlockHeader {
83 fn from(block_changes: &BlockChanges) -> Self {
84 let block = &block_changes.block;
85 Self {
86 hash: block.hash.clone(),
87 number: block.number,
88 parent_hash: block.parent_hash.clone(),
89 revert: block_changes.revert,
90 timestamp: block.ts.and_utc().timestamp() as u64,
91 partial_block_index: block_changes.partial_block_index,
92 }
93 }
94}
95
96impl HeaderLike for BlockHeader {
97 fn block(self) -> Option<BlockHeader> {
98 Some(self)
99 }
100
101 fn block_number_or_timestamp(self) -> u64 {
102 self.number
103 }
104}
105
106#[derive(Error, Debug)]
107pub enum BlockSynchronizerError {
108 #[error("Failed to initialize synchronizer: {0}")]
109 InitializationError(#[from] SynchronizerError),
110
111 #[error("Failed to process new block: {0}")]
112 BlockHistoryError(#[from] BlockHistoryError),
113
114 #[error("Not a single synchronizer was ready: {0}")]
115 NoReadySynchronizers(String),
116
117 #[error("No synchronizers were set")]
118 NoSynchronizers,
119
120 #[error("Failed to convert duration: {0}")]
121 DurationConversionError(String),
122}
123
124type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
125
126pub struct BlockSynchronizer<S> {
174 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
175 block_time: std::time::Duration,
177 latency_buffer: std::time::Duration,
179 startup_timeout: std::time::Duration,
181 max_messages: Option<usize>,
183 max_missed_blocks: u64,
185}
186
187#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
188#[serde(tag = "status", rename_all = "lowercase")]
189pub enum SynchronizerState {
190 Started,
192 Ready(BlockHeader),
194 Delayed(BlockHeader),
197 Stale(BlockHeader),
201 Advanced(BlockHeader),
206 Ended(String),
208}
209
210impl Display for SynchronizerState {
211 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212 match self {
213 SynchronizerState::Started => write!(f, "Started"),
214 SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
215 SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
216 SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
217 SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
218 SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
219 }
220 }
221}
222
223pub struct SynchronizerStream {
224 extractor_id: ExtractorIdentity,
225 state: SynchronizerState,
226 error: Option<SynchronizerError>,
227 modify_ts: NaiveDateTime,
228 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
229}
230
231impl SynchronizerStream {
232 fn new(
233 extractor_id: &ExtractorIdentity,
234 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
235 ) -> Self {
236 Self {
237 extractor_id: extractor_id.clone(),
238 state: SynchronizerState::Started,
239 error: None,
240 modify_ts: Local::now().naive_utc(),
241 rx,
242 }
243 }
244 async fn try_advance(
245 &mut self,
246 block_history: &BlockHistory,
247 block_time: std::time::Duration,
248 latency_buffer: std::time::Duration,
249 stale_threshold: std::time::Duration,
250 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
251 let extractor_id = self.extractor_id.clone();
252 let latest_block = block_history.latest();
253
254 match &self.state {
255 SynchronizerState::Started | SynchronizerState::Ended(_) => {
256 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
257 Ok(None)
258 }
259 SynchronizerState::Advanced(b) => {
260 let future_block = b.clone();
261 self.transition(future_block, block_history, stale_threshold)?;
263 Ok(None)
264 }
265 SynchronizerState::Ready(previous_block) => {
266 self.try_recv_next_expected(
268 block_time + latency_buffer,
269 block_history,
270 previous_block.clone(),
271 stale_threshold,
272 )
273 .await
274 }
275 SynchronizerState::Delayed(old_block) => {
276 debug!(
278 %old_block,
279 latest_block=opt(&latest_block),
280 %extractor_id,
281 "Trying to catch up to latest block"
282 );
283 self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
284 .await
285 }
286 SynchronizerState::Stale(old_block) => {
287 debug!(
289 %old_block,
290 latest_block=opt(&latest_block),
291 %extractor_id,
292 "Trying to catch up stale synchronizer to latest block"
293 );
294 self.try_catch_up(block_history, block_time, stale_threshold)
295 .await
296 }
297 }
298 }
299
300 async fn try_recv_next_expected(
308 &mut self,
309 max_wait: std::time::Duration,
310 block_history: &BlockHistory,
311 previous_block: BlockHeader,
312 stale_threshold: std::time::Duration,
313 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
314 let extractor_id = self.extractor_id.clone();
315 match timeout(max_wait, self.rx.recv()).await {
316 Ok(Some(Ok(msg))) => {
317 self.transition(msg.header.clone(), block_history, stale_threshold)?;
318 Ok(Some(msg))
319 }
320 Ok(Some(Err(e))) => {
321 self.mark_errored(e);
323 Ok(None)
324 }
325 Ok(None) => {
326 warn!(
329 %extractor_id,
330 "Tried to poll from closed synchronizer.",
331 );
332 self.mark_closed();
333 Ok(None)
334 }
335 Err(_) => {
336 debug!(%extractor_id, %previous_block, "No block received within time limit.");
338 self.state = SynchronizerState::Delayed(previous_block.clone());
341 self.modify_ts = Local::now().naive_utc();
342 Ok(None)
343 }
344 }
345 }
346
347 async fn try_catch_up(
353 &mut self,
354 block_history: &BlockHistory,
355 max_wait: std::time::Duration,
356 stale_threshold: std::time::Duration,
357 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
358 let mut results = Vec::new();
359 let extractor_id = self.extractor_id.clone();
360
361 let deadline = std::time::Instant::now() + max_wait;
363
364 while std::time::Instant::now() < deadline {
365 match timeout(
366 deadline.saturating_duration_since(std::time::Instant::now()),
367 self.rx.recv(),
368 )
369 .await
370 {
371 Ok(Some(Ok(msg))) => {
372 debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
373 let block_pos = block_history.determine_block_position(&msg.header)?;
374 results.push(msg);
375 if matches!(block_pos, BlockPosition::NextExpected | BlockPosition::NextPartial)
376 {
377 break;
378 }
379 }
380 Ok(Some(Err(e))) => {
381 self.mark_errored(e);
383 return Ok(None);
384 }
385 Ok(None) => {
386 warn!(
389 %extractor_id,
390 "Tried to poll from closed synchronizer during catch up.",
391 );
392 self.mark_closed();
393 return Ok(None);
394 }
395 Err(_) => {
396 debug!(%extractor_id, "Timed out waiting for catch-up");
397 break;
398 }
399 }
400 }
401
402 let merged = results
403 .into_iter()
404 .reduce(|l, r| l.merge(r));
405
406 if let Some(msg) = merged {
407 debug!(%extractor_id, "Delayed extractor made progress!");
409 self.transition(msg.header.clone(), block_history, stale_threshold)?;
410 Ok(Some(msg))
411 } else {
412 self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
414 Ok(None)
415 }
416 }
417
418 fn check_and_transition_to_stale_if_needed(
420 &mut self,
421 stale_threshold: std::time::Duration,
422 fallback_header: Option<BlockHeader>,
423 ) -> Result<bool, BlockSynchronizerError> {
424 let now = Local::now().naive_utc();
425 let wait_duration = now.signed_duration_since(self.modify_ts);
426 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
427 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
428
429 if wait_duration > stale_threshold_chrono {
430 let header_to_use = match (&self.state, fallback_header) {
431 (SynchronizerState::Ready(h), _) |
432 (SynchronizerState::Delayed(h), _) |
433 (SynchronizerState::Stale(h), _) => h.clone(),
434 (_, Some(h)) => h,
435 _ => BlockHeader::default(),
436 };
437
438 warn!(
439 extractor_id=%self.extractor_id,
440 last_message_at=?self.modify_ts,
441 "SynchronizerStream transition to stale due to timeout."
442 );
443 self.state = SynchronizerState::Stale(header_to_use);
444 self.modify_ts = now;
445 Ok(true)
446 } else {
447 Ok(false)
448 }
449 }
450
451 fn transition(
458 &mut self,
459 latest_retrieved: BlockHeader,
460 block_history: &BlockHistory,
461 stale_threshold: std::time::Duration,
462 ) -> Result<(), BlockSynchronizerError> {
463 let extractor_id = self.extractor_id.clone();
464 let last_message_at = self.modify_ts;
465 let block = &latest_retrieved;
466
467 match block_history.determine_block_position(&latest_retrieved)? {
468 BlockPosition::NextExpected | BlockPosition::NextPartial => {
469 self.state = SynchronizerState::Ready(latest_retrieved.clone());
470 trace!(
471 next = %latest_retrieved,
472 extractor = %extractor_id,
473 "SynchronizerStream transition to next expected"
474 )
475 }
476 BlockPosition::Latest | BlockPosition::Delayed => {
477 if !self.check_and_transition_to_stale_if_needed(
478 stale_threshold,
479 Some(latest_retrieved.clone()),
480 )? {
481 warn!(
482 %extractor_id,
483 ?last_message_at,
484 %block,
485 "SynchronizerStream transition transition to delayed."
486 );
487 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
488 }
489 }
490 BlockPosition::Advanced => {
491 info!(
492 %extractor_id,
493 ?last_message_at,
494 latest = opt(&block_history.latest()),
495 %block,
496 "SynchronizerStream transition to advanced."
497 );
498 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
499 }
500 }
501 self.modify_ts = Local::now().naive_utc();
502 Ok(())
503 }
504
505 fn mark_errored(&mut self, error: SynchronizerError) {
510 self.state = SynchronizerState::Ended(error.to_string());
511 self.modify_ts = Local::now().naive_utc();
512 self.error = Some(error);
513 }
514
515 fn mark_closed(&mut self) {
521 if !matches!(self.state, SynchronizerState::Ended(_)) {
522 self.state = SynchronizerState::Ended("Closed".to_string());
523 self.modify_ts = Local::now().naive_utc();
524 }
525 }
526
527 fn mark_stale(&mut self, header: &BlockHeader) {
529 self.state = SynchronizerState::Stale(header.clone());
530 self.modify_ts = Local::now().naive_utc();
531 }
532
533 fn mark_ready(&mut self, header: &BlockHeader) {
535 self.state = SynchronizerState::Ready(header.clone());
536 self.modify_ts = Local::now().naive_utc();
537 }
538
539 fn has_ended(&self) -> bool {
540 matches!(self.state, SynchronizerState::Ended(_))
541 }
542
543 fn is_stale(&self) -> bool {
544 matches!(self.state, SynchronizerState::Stale(_))
545 }
546
547 fn is_advanced(&self) -> bool {
548 matches!(self.state, SynchronizerState::Advanced(_))
549 }
550
551 fn get_current_header(&self) -> Option<&BlockHeader> {
555 match &self.state {
556 SynchronizerState::Ready(b) |
557 SynchronizerState::Delayed(b) |
558 SynchronizerState::Advanced(b) => Some(b),
559 _ => None,
560 }
561 }
562}
563
564#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
565pub struct FeedMessage<H = BlockHeader>
566where
567 H: HeaderLike,
568{
569 pub state_msgs: HashMap<String, StateSyncMessage<H>>,
570 pub sync_states: HashMap<String, SynchronizerState>,
571}
572
573impl<H> FeedMessage<H>
574where
575 H: HeaderLike,
576{
577 fn new(
578 state_msgs: HashMap<String, StateSyncMessage<H>>,
579 sync_states: HashMap<String, SynchronizerState>,
580 ) -> Self {
581 Self { state_msgs, sync_states }
582 }
583}
584
585impl<S> BlockSynchronizer<S>
586where
587 S: StateSynchronizer,
588{
589 pub fn new(
590 block_time: std::time::Duration,
591 latency_buffer: std::time::Duration,
592 max_missed_blocks: u64,
593 ) -> Self {
594 Self {
595 synchronizers: None,
596 max_messages: None,
597 block_time,
598 latency_buffer,
599 startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
600 max_missed_blocks,
601 }
602 }
603
604 pub fn max_messages(&mut self, val: usize) {
610 self.max_messages = Some(val);
611 }
612
613 pub fn startup_timeout(mut self, val: Duration) {
617 self.startup_timeout = val;
618 }
619
620 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
621 let mut registered = self.synchronizers.unwrap_or_default();
622 registered.insert(id, synchronizer);
623 self.synchronizers = Some(registered);
624 self
625 }
626
627 #[cfg(test)]
628 pub fn with_short_timeouts() -> Self {
629 Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
630 }
631
632 async fn cleanup_synchronizers(
635 mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
636 sync_close_senders: Vec<oneshot::Sender<()>>,
637 ) {
638 for close_sender in sync_close_senders {
640 let _ = close_sender.send(());
641 }
642
643 let mut completed_tasks = 0;
645 while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
646 completed_tasks += 1;
647 }
648
649 let remaining_tasks = state_sync_tasks.len();
651 if remaining_tasks > 0 {
652 warn!(
653 completed = completed_tasks,
654 timed_out = remaining_tasks,
655 "Some synchronizers timed out during cleanup and may not have shut down cleanly"
656 );
657 }
658 }
659
660 pub async fn run(
665 mut self,
666 ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
667 {
668 trace!("Starting BlockSynchronizer...");
669 let state_sync_tasks = FuturesUnordered::new();
670 let mut synchronizers = self
671 .synchronizers
672 .take()
673 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
674 let init_tasks = synchronizers
676 .values_mut()
677 .map(|s| s.initialize())
678 .collect::<Vec<_>>();
679 try_join_all(init_tasks).await?;
680
681 let mut sync_streams = Vec::with_capacity(synchronizers.len());
682 let mut sync_close_senders = Vec::new();
683 for (extractor_id, synchronizer) in synchronizers.drain() {
684 let (handle, rx) = synchronizer.start().await;
685 let (join_handle, close_sender) = handle.split();
686 state_sync_tasks.push(join_handle);
687 sync_close_senders.push(close_sender);
688
689 sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
690 }
691
692 debug!("Waiting for initial synchronizer messages...");
695 let mut startup_futures = Vec::new();
696 for synchronizer in sync_streams.iter_mut() {
697 let fut = async {
698 let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
699 (synchronizer, res)
700 };
701 startup_futures.push(fut);
702 }
703
704 let mut ready_sync_msgs = HashMap::new();
705 let initial_headers = join_all(startup_futures)
706 .await
707 .into_iter()
708 .filter_map(|(synchronizer, res)| {
709 let extractor_id = synchronizer.extractor_id.clone();
710 match res {
711 Ok(Some(Ok(msg))) => {
712 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
713 synchronizer.mark_ready(&msg.header);
715 let header = msg.header.clone();
716 ready_sync_msgs.insert(extractor_id.name.clone(), msg);
717 Some(header)
718 }
719 Ok(Some(Err(e))) => {
720 synchronizer.mark_errored(e);
721 None
722 }
723 Ok(None) => {
724 warn!(%extractor_id, "Synchronizer closed during startup");
729 synchronizer.mark_closed();
730 None
731 }
732 Err(_) => {
733 warn!(%extractor_id, "Timed out waiting for first message");
735 synchronizer.mark_stale(&BlockHeader::default());
736 None
737 }
738 }
739 })
740 .collect::<HashSet<_>>() .into_iter()
742 .collect::<Vec<_>>();
743
744 Self::check_streams(&sync_streams)?;
746 let mut block_history = BlockHistory::new(initial_headers, 15)?;
747 let start_header = block_history
749 .latest()
750 .ok_or(BlockHistoryError::EmptyHistory)?;
752 info!(
753 start_block=%start_header,
754 n_healthy=ready_sync_msgs.len(),
755 n_total=sync_streams.len(),
756 "Block synchronization started successfully!"
757 );
758
759 for stream in sync_streams.iter_mut() {
762 if let SynchronizerState::Ready(header) = stream.state.clone() {
763 if header.number < start_header.number {
764 debug!(
765 extractor_id=%stream.extractor_id,
766 synchronizer_block=header.number,
767 current_block=start_header.number,
768 "Marking synchronizer as delayed during initialization"
769 );
770 stream.state = SynchronizerState::Delayed(header);
771 }
772 }
773 }
774
775 let (sync_tx, sync_rx) = mpsc::channel(30);
776 let main_loop_jh = tokio::spawn(async move {
777 let mut n_iter = 1;
778 loop {
779 let msg = FeedMessage::new(
781 std::mem::take(&mut ready_sync_msgs),
782 sync_streams
783 .iter()
784 .map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
785 .collect(),
786 );
787 if sync_tx.send(Ok(msg)).await.is_err() {
788 info!("Receiver closed, block synchronizer terminating..");
789 return;
790 };
791
792 if let Some(max_messages) = self.max_messages {
794 if n_iter >= max_messages {
795 info!(max_messages, "StreamEnd");
796 return;
797 }
798 }
799 n_iter += 1;
800
801 let res = self
802 .handle_next_message(
803 &mut sync_streams,
804 &mut ready_sync_msgs,
805 &mut block_history,
806 )
807 .await;
808
809 if let Err(e) = res {
810 let _ = sync_tx.send(Err(e)).await;
812 return;
813 }
814 }
815 });
816
817 let nanny_jh = tokio::spawn(async move {
822 let _ = main_loop_jh.await.map_err(|e| {
824 if e.is_panic() {
825 error!("BlockSynchornizer main loop panicked: {e}")
826 }
827 });
828 debug!("Main loop exited. Closing synchronizers");
829 Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
830 debug!("Shutdown complete");
831 });
832 Ok((nanny_jh, sync_rx))
833 }
834
835 async fn handle_next_message(
840 &self,
841 sync_streams: &mut [SynchronizerStream],
842 ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
843 block_history: &mut BlockHistory,
844 ) -> BlockSyncResult<()> {
845 let mut recv_futures = Vec::new();
846 for stream in sync_streams.iter_mut() {
847 if stream.has_ended() {
850 continue;
851 }
852 recv_futures.push(async {
862 let res = stream
863 .try_advance(
864 block_history,
865 self.block_time,
866 self.latency_buffer,
867 self.block_time
868 .mul_f64(self.max_missed_blocks as f64),
869 )
870 .await?;
871 Ok::<_, BlockSynchronizerError>(
872 res.map(|msg| (stream.extractor_id.name.clone(), msg)),
873 )
874 });
875 }
876 ready_sync_msgs.extend(
877 join_all(recv_futures)
878 .await
879 .into_iter()
880 .collect::<Result<Vec<_>, _>>()?
881 .into_iter()
882 .flatten(),
883 );
884
885 Self::check_streams(sync_streams)?;
888
889 if sync_streams
892 .iter()
893 .any(SynchronizerStream::is_advanced)
894 {
895 *block_history = Self::reinit_block_history(sync_streams, block_history)?;
896 } else {
897 let header = sync_streams
898 .iter()
899 .filter_map(SynchronizerStream::get_current_header)
900 .max_by_key(|b| b.number)
901 .ok_or(BlockSynchronizerError::NoReadySynchronizers(
903 "Expected to have at least one synchronizer that is not stale or ended"
904 .to_string(),
905 ))?;
906 block_history.push(header.clone())?;
907 }
908 Ok(())
909 }
910
911 fn reinit_block_history(
920 sync_streams: &mut [SynchronizerStream],
921 block_history: &mut BlockHistory,
922 ) -> Result<BlockHistory, BlockSynchronizerError> {
923 let previous = block_history
924 .latest()
925 .ok_or(BlockHistoryError::EmptyHistory)?;
927 let blocks = sync_streams
928 .iter()
929 .filter_map(SynchronizerStream::get_current_header)
930 .cloned()
931 .collect();
932 let new_block_history = BlockHistory::new(blocks, 10)?;
933 let latest = new_block_history
934 .latest()
935 .ok_or(BlockHistoryError::EmptyHistory)?;
937 info!(
938 %previous,
939 %latest,
940 "Advanced synchronizer detected. Reinitialized block history."
941 );
942 sync_streams
943 .iter_mut()
944 .for_each(|stream| {
945 if let Some(header) = stream.get_current_header() {
948 if header.number < latest.number {
949 stream.state = SynchronizerState::Delayed(header.clone());
950 } else if header.number == latest.number {
951 stream.state = SynchronizerState::Ready(header.clone());
952 }
953 }
954 });
955 Ok(new_block_history)
956 }
957
958 fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
963 let mut latest_errored_stream: Option<&SynchronizerStream> = None;
964
965 for stream in sync_streams.iter() {
966 if !stream.has_ended() && !stream.is_stale() {
968 return Ok(());
969 }
970
971 if latest_errored_stream.is_none() ||
973 stream.modify_ts >
974 latest_errored_stream
975 .as_ref()
976 .unwrap()
977 .modify_ts
978 {
979 latest_errored_stream = Some(stream);
980 }
981 }
982
983 let last_error_reason = if let Some(stream) = latest_errored_stream {
984 if let Some(err) = &stream.error {
985 format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
986 } else {
987 format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
988 }
989 } else {
990 return Err(BlockSynchronizerError::NoSynchronizers);
991 };
992
993 let mut reason = vec![last_error_reason];
994
995 sync_streams.iter().for_each(|stream| {
996 reason.push(format!(
997 "{} reported as {} at {}",
998 stream.extractor_id, stream.state, stream.modify_ts
999 ))
1000 });
1001
1002 Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
1003 }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use std::sync::Arc;
1009
1010 use async_trait::async_trait;
1011 use test_log::test;
1012 use tokio::sync::{oneshot, Mutex};
1013 use tycho_common::dto::Chain;
1014
1015 use super::*;
1016 use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
1017
1018 #[derive(Clone, Debug)]
1019 enum MockBehavior {
1020 Normal, IgnoreClose, ExitImmediately, }
1024
1025 type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1026
1027 #[derive(Clone)]
1028 struct MockStateSync {
1029 header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1030 header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1031 close_received: Arc<Mutex<bool>>,
1032 behavior: MockBehavior,
1033 close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1035 }
1036
1037 impl MockStateSync {
1038 fn new() -> Self {
1039 Self::with_behavior(MockBehavior::Normal)
1040 }
1041
1042 fn with_behavior(behavior: MockBehavior) -> Self {
1043 let (tx, rx) = mpsc::channel(1);
1044 Self {
1045 header_tx: tx,
1046 header_rx: Arc::new(Mutex::new(Some(rx))),
1047 close_received: Arc::new(Mutex::new(false)),
1048 behavior,
1049 close_tx: Arc::new(Mutex::new(None)),
1050 }
1051 }
1052
1053 async fn was_close_received(&self) -> bool {
1054 *self.close_received.lock().await
1055 }
1056
1057 async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1058 self.header_tx
1059 .send(Ok(header))
1060 .await
1061 .map_err(|e| format!("sending header failed: {e}"))
1062 }
1063
1064 async fn trigger_close(&self) {
1066 if let Some(close_tx) = self.close_tx.lock().await.take() {
1067 let _ = close_tx.send(());
1068 }
1069 }
1070 }
1071
1072 #[async_trait]
1073 impl StateSynchronizer for MockStateSync {
1074 async fn initialize(&mut self) -> SyncResult<()> {
1075 Ok(())
1076 }
1077
1078 async fn start(
1079 mut self,
1080 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1081 let block_rx = {
1082 let mut guard = self.header_rx.lock().await;
1083 guard
1084 .take()
1085 .expect("Block receiver was not set!")
1086 };
1087
1088 let (close_tx_for_handle, close_rx) = oneshot::channel();
1091 let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1092
1093 {
1095 let mut guard = self.close_tx.lock().await;
1096 *guard = Some(close_tx_for_test);
1097 }
1098
1099 let behavior = self.behavior.clone();
1100 let close_received_clone = self.close_received.clone();
1101 let tx = self.header_tx.clone();
1102
1103 let jh = tokio::spawn(async move {
1104 match behavior {
1105 MockBehavior::IgnoreClose => {
1106 loop {
1109 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1110 }
1111 }
1112 MockBehavior::ExitImmediately => {
1113 tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1115 "Simulated immediate task failure".to_string(),
1116 )))
1117 .await
1118 .unwrap();
1119 }
1120 MockBehavior::Normal => {
1121 let _ = tokio::select! {
1124 result = close_rx => result,
1125 result = close_rx_for_test => result,
1126 };
1127 let mut guard = close_received_clone.lock().await;
1128 *guard = true;
1129 }
1130 }
1131 });
1132
1133 let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1134 (handle, block_rx)
1135 }
1136 }
1137
1138 fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1139 StateSyncMessage {
1140 header: BlockHeader {
1141 number: block as u64,
1142 hash: Bytes::from(vec![block]),
1143 parent_hash: Bytes::from(vec![block - 1]),
1144 revert: false,
1145 timestamp: 1000,
1146 partial_block_index: None,
1147 },
1148 ..Default::default()
1149 }
1150 }
1151
1152 fn partial_header_message(block: u8, partial_idx: u32) -> StateSyncMessage<BlockHeader> {
1155 let hash_bytes =
1157 [(block as u64).to_be_bytes().as_slice(), partial_idx.to_be_bytes().as_slice()]
1158 .concat();
1159 StateSyncMessage {
1160 header: BlockHeader {
1161 number: block as u64,
1162 hash: Bytes::from(hash_bytes),
1163 parent_hash: Bytes::from(vec![block - 1]),
1164 revert: false,
1165 timestamp: 1000,
1166 partial_block_index: Some(partial_idx),
1167 },
1168 ..Default::default()
1169 }
1170 }
1171
1172 fn revert_header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1173 StateSyncMessage {
1174 header: BlockHeader {
1175 number: block as u64,
1176 hash: Bytes::from(vec![block]),
1177 parent_hash: Bytes::from(vec![block - 1]),
1178 revert: true,
1179 timestamp: 1000,
1180 partial_block_index: None,
1181 },
1182 ..Default::default()
1183 }
1184 }
1185
1186 async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1187 timeout(Duration::from_millis(100), rx.recv())
1188 .await
1189 .expect("Responds in time")
1190 .expect("Should receive first message")
1191 .expect("No error")
1192 }
1193
1194 async fn setup_block_sync(
1195 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1196 {
1197 setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1198 }
1199
1200 async fn setup_block_sync_with_behaviour(
1202 v2_behavior: MockBehavior,
1203 v3_behavior: MockBehavior,
1204 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1205 {
1206 let v2_sync = MockStateSync::with_behavior(v2_behavior);
1207 let v3_sync = MockStateSync::with_behavior(v3_behavior);
1208
1209 let mut block_sync = BlockSynchronizer::new(
1211 Duration::from_millis(20), Duration::from_millis(10), 3, );
1215 block_sync.max_messages(10); let block_sync = block_sync
1218 .register_synchronizer(
1219 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1220 v2_sync.clone(),
1221 )
1222 .register_synchronizer(
1223 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1224 v3_sync.clone(),
1225 );
1226
1227 let block1_msg = header_message(1);
1229 let _ = v2_sync
1230 .send_header(block1_msg.clone())
1231 .await;
1232 let _ = v3_sync
1233 .send_header(block1_msg.clone())
1234 .await;
1235
1236 let (nanny_handle, mut rx) = block_sync
1238 .run()
1239 .await
1240 .expect("BlockSynchronizer failed to start");
1241
1242 let first_feed_msg = receive_message(&mut rx).await;
1243 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1244 assert!(matches!(
1245 first_feed_msg
1246 .sync_states
1247 .get("uniswap-v2")
1248 .unwrap(),
1249 SynchronizerState::Ready(_)
1250 ));
1251 assert!(matches!(
1252 first_feed_msg
1253 .sync_states
1254 .get("uniswap-v3")
1255 .unwrap(),
1256 SynchronizerState::Ready(_)
1257 ));
1258
1259 (v2_sync, v3_sync, nanny_handle, rx)
1260 }
1261
1262 async fn shutdown_block_synchronizer(
1263 v2_sync: &MockStateSync,
1264 v3_sync: &MockStateSync,
1265 nanny_handle: JoinHandle<()>,
1266 ) {
1267 v3_sync.trigger_close().await;
1268 v2_sync.trigger_close().await;
1269
1270 timeout(Duration::from_millis(100), nanny_handle)
1271 .await
1272 .expect("Nanny failed to exit within time")
1273 .expect("Nanny panicked");
1274 }
1275
1276 async fn send_and_assert_ready(
1278 sync: &MockStateSync,
1279 sync_name: &str,
1280 rx: &mut Receiver<BlockSyncResult<FeedMessage>>,
1281 msg: StateSyncMessage<BlockHeader>,
1282 expected_block: u64,
1283 expected_partial: Option<u32>,
1284 ) {
1285 sync.send_header(msg)
1286 .await
1287 .expect("send failed");
1288 let feed_msg = receive_message(rx).await;
1289 let state = feed_msg
1290 .sync_states
1291 .get(sync_name)
1292 .unwrap();
1293 match state {
1294 SynchronizerState::Ready(h) => {
1295 assert_eq!(h.number, expected_block, "wrong block number");
1296 assert_eq!(h.partial_block_index, expected_partial, "wrong partial index");
1297 }
1298 other => panic!("expected Ready, got {:?}", other),
1299 }
1300 }
1301
1302 #[test(tokio::test)]
1303 async fn test_two_ready_synchronizers() {
1304 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1305
1306 let second_msg = header_message(2);
1307 v2_sync
1308 .send_header(second_msg.clone())
1309 .await
1310 .expect("send_header failed");
1311 v3_sync
1312 .send_header(second_msg.clone())
1313 .await
1314 .expect("send_header failed");
1315 let second_feed_msg = receive_message(&mut rx).await;
1316
1317 let exp2 = FeedMessage {
1318 state_msgs: [
1319 ("uniswap-v2".to_string(), second_msg.clone()),
1320 ("uniswap-v3".to_string(), second_msg.clone()),
1321 ]
1322 .into_iter()
1323 .collect(),
1324 sync_states: [
1325 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1326 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1327 ]
1328 .into_iter()
1329 .collect(),
1330 };
1331 assert_eq!(second_feed_msg, exp2);
1332
1333 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1334 }
1335
1336 #[test(tokio::test)]
1337 async fn test_delayed_synchronizer_catches_up() {
1338 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1339
1340 let block2_msg = header_message(2);
1342 v2_sync
1343 .send_header(block2_msg.clone())
1344 .await
1345 .expect("send_header failed");
1346
1347 let second_feed_msg = receive_message(&mut rx).await;
1349 debug!("Consumed second message for v2");
1350
1351 assert!(second_feed_msg
1352 .state_msgs
1353 .contains_key("uniswap-v2"));
1354 assert!(matches!(
1355 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1356 SynchronizerState::Ready(header) if header.number == 2
1357 ));
1358 assert!(!second_feed_msg
1359 .state_msgs
1360 .contains_key("uniswap-v3"));
1361 assert!(matches!(
1362 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1363 SynchronizerState::Delayed(header) if header.number == 1
1364 ));
1365
1366 v3_sync
1368 .send_header(block2_msg.clone())
1369 .await
1370 .expect("send_header failed");
1371
1372 let block3_msg = header_message(3);
1374 v2_sync
1375 .send_header(block3_msg.clone())
1376 .await
1377 .expect("send_header failed");
1378 v3_sync
1379 .send_header(block3_msg)
1380 .await
1381 .expect("send_header failed");
1382
1383 let mut third_feed_msg = receive_message(&mut rx).await;
1386
1387 if !third_feed_msg
1390 .state_msgs
1391 .contains_key("uniswap-v2")
1392 {
1393 third_feed_msg = rx
1394 .recv()
1395 .await
1396 .expect("header channel was closed")
1397 .expect("no error");
1398 }
1399 assert!(third_feed_msg
1400 .state_msgs
1401 .contains_key("uniswap-v2"));
1402 assert!(third_feed_msg
1403 .state_msgs
1404 .contains_key("uniswap-v3"));
1405 assert!(matches!(
1406 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1407 SynchronizerState::Ready(header) if header.number == 3
1408 ));
1409 assert!(matches!(
1410 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1411 SynchronizerState::Ready(header) if header.number == 3
1412 ));
1413
1414 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1415 }
1416
1417 #[test(tokio::test)]
1418 async fn test_different_start_blocks() {
1419 let v2_sync = MockStateSync::new();
1420 let v3_sync = MockStateSync::new();
1421 let block_sync = BlockSynchronizer::with_short_timeouts()
1422 .register_synchronizer(
1423 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1424 v2_sync.clone(),
1425 )
1426 .register_synchronizer(
1427 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1428 v3_sync.clone(),
1429 );
1430
1431 let block1_msg = header_message(1);
1433 let block2_msg = header_message(2);
1434
1435 let _ = v2_sync
1436 .send_header(block1_msg.clone())
1437 .await;
1438 v3_sync
1439 .send_header(block2_msg.clone())
1440 .await
1441 .expect("send_header failed");
1442
1443 let (jh, mut rx) = block_sync
1445 .run()
1446 .await
1447 .expect("BlockSynchronizer failed to start.");
1448
1449 let first_feed_msg = receive_message(&mut rx).await;
1451 assert!(matches!(
1452 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1453 SynchronizerState::Delayed(header) if header.number == 1
1454 ));
1455 assert!(matches!(
1456 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1457 SynchronizerState::Ready(header) if header.number == 2
1458 ));
1459
1460 v2_sync
1462 .send_header(block2_msg.clone())
1463 .await
1464 .expect("send_header failed");
1465
1466 let block3_msg = header_message(3);
1468 let _ = v2_sync
1469 .send_header(block3_msg.clone())
1470 .await;
1471 v3_sync
1472 .send_header(block3_msg.clone())
1473 .await
1474 .expect("send_header failed");
1475
1476 let second_feed_msg = receive_message(&mut rx).await;
1478 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1479 assert!(matches!(
1480 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1481 SynchronizerState::Ready(header) if header.number == 3
1482 ));
1483 assert!(matches!(
1484 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1485 SynchronizerState::Ready(header) if header.number == 3
1486 ));
1487
1488 shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
1489 }
1490
1491 #[test(tokio::test)]
1492 async fn test_synchronizer_fails_other_goes_stale() {
1493 let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1494 setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1495 .await;
1496
1497 let mut error_reported = false;
1498 for _ in 0..3 {
1499 if let Some(msg) = sync_rx.recv().await {
1500 match msg {
1501 Err(_) => error_reported = true,
1502 Ok(msg) => {
1503 assert!(matches!(
1504 msg.sync_states
1505 .get("uniswap-v3")
1506 .unwrap(),
1507 SynchronizerState::Delayed(_)
1508 ));
1509 assert!(matches!(
1510 msg.sync_states
1511 .get("uniswap-v2")
1512 .unwrap(),
1513 SynchronizerState::Ended(_)
1514 ));
1515 }
1516 }
1517 }
1518 }
1519 assert!(error_reported, "BlockSynchronizer did not report final error");
1520
1521 let result = timeout(Duration::from_secs(2), nanny_handle).await;
1523 assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1524
1525 assert!(
1527 v3_sync.was_close_received().await,
1528 "v3_sync should have received close signal during cleanup"
1529 );
1530 }
1531
1532 #[test(tokio::test)]
1533 async fn test_cleanup_timeout_warning() {
1534 let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1537 MockBehavior::ExitImmediately,
1538 MockBehavior::IgnoreClose,
1539 )
1540 .await;
1541
1542 let result = timeout(Duration::from_secs(10), nanny_handle).await;
1544 assert!(
1545 result.is_ok(),
1546 "Nanny should complete even when some synchronizers timeout during cleanup"
1547 );
1548
1549 }
1553
1554 #[test(tokio::test)]
1555 async fn test_one_synchronizer_goes_stale_while_other_works() {
1556 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1558
1559 let block2_msg = header_message(2);
1561 let _ = v3_sync
1562 .send_header(block2_msg.clone())
1563 .await;
1564 let second_feed_msg = receive_message(&mut rx).await;
1568 assert!(second_feed_msg
1569 .state_msgs
1570 .contains_key("uniswap-v3"));
1571 assert!(!second_feed_msg
1572 .state_msgs
1573 .contains_key("uniswap-v2"));
1574 assert!(matches!(
1575 second_feed_msg
1576 .sync_states
1577 .get("uniswap-v3")
1578 .unwrap(),
1579 SynchronizerState::Ready(_)
1580 ));
1581 if let Some(v2_state) = second_feed_msg
1583 .sync_states
1584 .get("uniswap-v2")
1585 {
1586 if matches!(v2_state, SynchronizerState::Delayed(_)) {
1587 assert!(
1589 !nanny_handle.is_finished(),
1590 "Nanny should still be running when synchronizer is delayed (not stale yet)"
1591 );
1592 }
1593 }
1594
1595 tokio::time::sleep(Duration::from_millis(15)).await;
1597
1598 let block3_msg = header_message(3);
1600 let _ = v3_sync
1601 .send_header(block3_msg.clone())
1602 .await;
1603
1604 tokio::time::sleep(Duration::from_millis(40)).await;
1605
1606 let mut stale_found = false;
1607 for _ in 0..2 {
1608 if let Some(Ok(msg)) = rx.recv().await {
1609 if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1610 stale_found = true;
1611 }
1612 }
1613 }
1614 assert!(stale_found, "v2 synchronizer should be stale");
1615
1616 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1617 }
1618
1619 #[test(tokio::test)]
1620 async fn test_all_synchronizers_go_stale_main_loop_exits() {
1621 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1623
1624 let mut seen_delayed = false;
1629
1630 let timeout_duration = Duration::from_millis(500); let start_time = tokio::time::Instant::now();
1634
1635 while let Ok(Some(Ok(msg))) =
1636 tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1637 {
1638 if !seen_delayed {
1640 let v2_state = msg.sync_states.get("uniswap-v2");
1641 let v3_state = msg.sync_states.get("uniswap-v3");
1642
1643 if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1644 matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1645 {
1646 seen_delayed = true;
1647 assert!(!nanny_handle.is_finished(),
1649 "Nanny should still be running when synchronizers are delayed (not stale yet)");
1650 break;
1652 }
1653 }
1654
1655 if start_time.elapsed() > timeout_duration {
1657 break;
1658 }
1659 }
1660 assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1662
1663 let mut error_reported = false;
1664 while let Some(msg) = rx.recv().await {
1666 if let Err(e) = msg {
1667 assert!(e
1668 .to_string()
1669 .contains("became: Stale(1)"));
1670 assert!(e
1671 .to_string()
1672 .contains("reported as Stale(1)"));
1673 error_reported = true;
1674 }
1675 }
1676 assert!(error_reported, "Expected the channel to report an error before closing");
1677
1678 let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1680 assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1681
1682 assert!(
1684 v2_sync.was_close_received().await,
1685 "v2_sync should have received close signal during cleanup"
1686 );
1687 assert!(
1688 v3_sync.was_close_received().await,
1689 "v3_sync should have received close signal during cleanup"
1690 );
1691 }
1692
1693 #[test(tokio::test)]
1694 async fn test_stale_synchronizer_recovers() {
1695 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1697
1698 tokio::time::sleep(Duration::from_millis(50)).await;
1700 let block2_msg = header_message(2);
1701 let _ = v2_sync
1702 .send_header(block2_msg.clone())
1703 .await;
1704
1705 for _ in 0..2 {
1707 if let Some(msg) = rx.recv().await {
1708 if let Ok(msg) = msg {
1709 if matches!(
1710 msg.sync_states
1711 .get("uniswap-v2")
1712 .unwrap(),
1713 SynchronizerState::Ready(_)
1714 ) {
1715 assert!(matches!(
1716 msg.sync_states
1717 .get("uniswap-v3")
1718 .unwrap(),
1719 SynchronizerState::Delayed(_)
1720 ));
1721 break;
1722 };
1723 }
1724 } else {
1725 panic!("Channel closed unexpectedly")
1726 }
1727 }
1728
1729 tokio::time::sleep(Duration::from_millis(15)).await;
1731 let block3_msg = header_message(3);
1732 let _ = v2_sync
1733 .send_header(block3_msg.clone())
1734 .await;
1735 let third_msg = receive_message(&mut rx).await;
1736 dbg!(&third_msg);
1737 assert!(matches!(
1738 third_msg
1739 .sync_states
1740 .get("uniswap-v2")
1741 .unwrap(),
1742 SynchronizerState::Ready(_)
1743 ));
1744 assert!(matches!(
1745 third_msg
1746 .sync_states
1747 .get("uniswap-v3")
1748 .unwrap(),
1749 SynchronizerState::Stale(_)
1750 ));
1751
1752 let block4_msg = header_message(4);
1753 let _ = v3_sync
1754 .send_header(block2_msg.clone())
1755 .await;
1756 let _ = v3_sync
1757 .send_header(block3_msg.clone())
1758 .await;
1759 let _ = v3_sync
1760 .send_header(block4_msg.clone())
1761 .await;
1762 let _ = v2_sync
1763 .send_header(block4_msg.clone())
1764 .await;
1765 let fourth_msg = receive_message(&mut rx).await;
1766 assert!(matches!(
1767 fourth_msg
1768 .sync_states
1769 .get("uniswap-v2")
1770 .unwrap(),
1771 SynchronizerState::Ready(_)
1772 ));
1773 assert!(matches!(
1774 fourth_msg
1775 .sync_states
1776 .get("uniswap-v3")
1777 .unwrap(),
1778 SynchronizerState::Ready(_)
1779 ));
1780
1781 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1782
1783 assert!(
1785 v2_sync.was_close_received().await,
1786 "v2_sync should have received close signal during cleanup"
1787 );
1788 assert!(
1789 v3_sync.was_close_received().await,
1790 "v3_sync should have received close signal during cleanup"
1791 );
1792 }
1793
1794 #[test(tokio::test)]
1795 async fn test_all_synchronizer_advanced() {
1796 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1800
1801 let block3 = header_message(3);
1802 v2_sync
1803 .send_header(block3.clone())
1804 .await
1805 .unwrap();
1806 v3_sync
1807 .send_header(block3)
1808 .await
1809 .unwrap();
1810
1811 let msg = receive_message(&mut rx).await;
1812 matches!(
1813 msg.sync_states
1814 .get("uniswap-v2")
1815 .unwrap(),
1816 SynchronizerState::Ready(_)
1817 );
1818 matches!(
1819 msg.sync_states
1820 .get("uniswap-v3")
1821 .unwrap(),
1822 SynchronizerState::Ready(_)
1823 );
1824
1825 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1826 }
1827
1828 #[test(tokio::test)]
1829 async fn test_one_synchronizer_advanced() {
1830 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1831
1832 let block2 = header_message(2);
1833 let block4 = header_message(4);
1834 v2_sync
1835 .send_header(block4.clone())
1836 .await
1837 .unwrap();
1838 v3_sync
1839 .send_header(block2.clone())
1840 .await
1841 .unwrap();
1842
1843 let msg = receive_message(&mut rx).await;
1844 matches!(
1845 msg.sync_states
1846 .get("uniswap-v2")
1847 .unwrap(),
1848 SynchronizerState::Ready(_)
1849 );
1850 matches!(
1851 msg.sync_states
1852 .get("uniswap-v3")
1853 .unwrap(),
1854 SynchronizerState::Delayed(_)
1855 );
1856
1857 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1858 }
1859
1860 #[test(tokio::test)]
1861 async fn test_partial_blocks_normal_operation() {
1862 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1865
1866 send_and_assert_ready(
1868 &v2_sync,
1869 "uniswap-v2",
1870 &mut rx,
1871 partial_header_message(2, 0),
1872 2,
1873 Some(0),
1874 )
1875 .await;
1876 send_and_assert_ready(
1877 &v2_sync,
1878 "uniswap-v2",
1879 &mut rx,
1880 partial_header_message(2, 3),
1881 2,
1882 Some(3),
1883 )
1884 .await;
1885 send_and_assert_ready(
1886 &v2_sync,
1887 "uniswap-v2",
1888 &mut rx,
1889 partial_header_message(2, 7),
1890 2,
1891 Some(7),
1892 )
1893 .await;
1894
1895 send_and_assert_ready(
1897 &v2_sync,
1898 "uniswap-v2",
1899 &mut rx,
1900 partial_header_message(3, 0),
1901 3,
1902 Some(0),
1903 )
1904 .await;
1905 send_and_assert_ready(
1906 &v2_sync,
1907 "uniswap-v2",
1908 &mut rx,
1909 partial_header_message(3, 2),
1910 3,
1911 Some(2),
1912 )
1913 .await;
1914
1915 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1916 }
1917
1918 #[test(tokio::test)]
1919 async fn test_partial_blocks_handles_reverts() {
1920 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1923
1924 send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(2), 2, None).await;
1926
1927 send_and_assert_ready(
1929 &v2_sync,
1930 "uniswap-v2",
1931 &mut rx,
1932 partial_header_message(3, 0),
1933 3,
1934 Some(0),
1935 )
1936 .await;
1937 send_and_assert_ready(
1938 &v2_sync,
1939 "uniswap-v2",
1940 &mut rx,
1941 partial_header_message(3, 2),
1942 3,
1943 Some(2),
1944 )
1945 .await;
1946
1947 send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, revert_header_message(2), 2, None)
1949 .await;
1950
1951 send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(3), 3, None).await;
1953
1954 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1955 }
1956
1957 #[test(tokio::test)]
1958 async fn test_partial_blocks_delayed_synchronizer_catches_up() {
1959 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1962
1963 let partial_0 = partial_header_message(2, 0);
1965 v2_sync
1966 .send_header(partial_0.clone())
1967 .await
1968 .expect("send partial 0 failed");
1969
1970 let msg = receive_message(&mut rx).await;
1971 assert!(msg
1973 .state_msgs
1974 .contains_key("uniswap-v2"));
1975 assert!(!msg
1976 .state_msgs
1977 .contains_key("uniswap-v3"));
1978 assert!(matches!(
1979 msg.sync_states.get("uniswap-v2").unwrap(),
1980 SynchronizerState::Ready(h) if h.partial_block_index == Some(0)
1981 ));
1982 assert!(matches!(
1983 msg.sync_states
1984 .get("uniswap-v3")
1985 .unwrap(),
1986 SynchronizerState::Delayed(_)
1987 ));
1988
1989 let partial_2 = partial_header_message(2, 2);
1991 v2_sync
1992 .send_header(partial_2.clone())
1993 .await
1994 .expect("send partial 2 failed");
1995 v3_sync
1996 .send_header(partial_0.clone())
1997 .await
1998 .expect("v3 catch up partial 0 failed");
1999 v3_sync
2000 .send_header(partial_2.clone())
2001 .await
2002 .expect("v3 catch up partial 2 failed");
2003
2004 let mut v3_ready = false;
2006 for _ in 0..3 {
2007 let msg = receive_message(&mut rx).await;
2008 if matches!(
2009 msg.sync_states.get("uniswap-v3").unwrap(),
2010 SynchronizerState::Ready(h) if h.partial_block_index == Some(2)
2011 ) {
2012 v3_ready = true;
2013 break;
2014 }
2015 }
2016 assert!(v3_ready, "v3 caught up to partial 2");
2017
2018 shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
2019 }
2020}