1use std::{
2 collections::{HashMap, HashSet},
3 fmt::{Display, Formatter},
4 time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{future::join_all, stream::FuturesUnordered, FutureExt, StreamExt};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12 sync::{
13 mpsc::{self, Receiver},
14 oneshot,
15 },
16 task::JoinHandle,
17 time::timeout,
18};
19use tracing::{debug, error, info, trace, warn};
20use tycho_common::{
21 display::opt,
22 models::{blockchain::BlockAggregatedChanges, ExtractorIdentity},
23 Bytes,
24};
25
26use crate::feed::{
27 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
28 synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
29};
30
31mod block_history;
32pub mod component_tracker;
33pub mod dto;
34pub mod synchronizer;
35
36pub trait HeaderLike {
41 fn block(self) -> Option<BlockHeader>;
42 fn block_number_or_timestamp(self) -> u64;
43}
44
45#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
46pub struct BlockHeader {
47 pub hash: Bytes,
48 pub number: u64,
49 pub parent_hash: Bytes,
50 pub revert: bool,
51 pub timestamp: u64,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub partial_block_index: Option<u32>,
55}
56
57impl BlockHeader {
58 fn is_partial(&self) -> bool {
59 self.partial_block_index.is_some()
60 }
61}
62
63impl Display for BlockHeader {
64 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65 let short_hash = if self.hash.len() >= 4 {
67 hex::encode(&self.hash[..4]) } else {
69 hex::encode(&self.hash)
70 };
71
72 match self.partial_block_index {
73 Some(idx) => write!(f, "Block #{} [0x{}..] (partial {})", self.number, short_hash, idx),
74 None => write!(f, "Block #{} [0x{}..]", self.number, short_hash),
75 }
76 }
77}
78
79impl From<&BlockAggregatedChanges> for BlockHeader {
80 fn from(block_changes: &BlockAggregatedChanges) -> Self {
81 let block = &block_changes.block;
82 Self {
83 hash: block.hash.clone(),
84 number: block.number,
85 parent_hash: block.parent_hash.clone(),
86 revert: block_changes.revert,
87 timestamp: block.ts.and_utc().timestamp() as u64,
88 partial_block_index: block_changes.partial_block_index,
89 }
90 }
91}
92
93impl HeaderLike for BlockHeader {
94 fn block(self) -> Option<BlockHeader> {
95 Some(self)
96 }
97
98 fn block_number_or_timestamp(self) -> u64 {
99 self.number
100 }
101}
102
103#[derive(Error, Debug)]
104pub enum BlockSynchronizerError {
105 #[error("Failed to initialize extractor '{extractor}': {source}")]
106 InitializationError {
107 extractor: ExtractorIdentity,
108 #[source]
109 source: SynchronizerError,
110 },
111
112 #[error("Failed to process new block: {0}")]
113 BlockHistoryError(#[from] BlockHistoryError),
114
115 #[error("Not a single synchronizer was ready: {0}")]
116 NoReadySynchronizers(String),
117
118 #[error("No synchronizers were set")]
119 NoSynchronizers,
120
121 #[error("Failed to convert duration: {0}")]
122 DurationConversionError(String),
123}
124
125type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
126
127pub struct BlockSynchronizer<S> {
175 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
176 block_time: std::time::Duration,
178 latency_buffer: std::time::Duration,
180 startup_timeout: std::time::Duration,
182 max_messages: Option<usize>,
184 max_missed_blocks: u64,
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
189#[serde(tag = "status", rename_all = "lowercase")]
190pub enum SynchronizerState {
191 Started,
193 Ready(BlockHeader),
195 Delayed(BlockHeader),
198 Stale(BlockHeader),
202 Advanced(BlockHeader),
207 Ended(String),
209}
210
211impl Display for SynchronizerState {
212 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
213 match self {
214 SynchronizerState::Started => write!(f, "Started"),
215 SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
216 SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
217 SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
218 SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
219 SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
220 }
221 }
222}
223
224pub struct SynchronizerStream {
225 extractor_id: ExtractorIdentity,
226 state: SynchronizerState,
227 error: Option<SynchronizerError>,
228 modify_ts: NaiveDateTime,
229 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
230}
231
232impl SynchronizerStream {
233 fn new(
234 extractor_id: &ExtractorIdentity,
235 rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
236 ) -> Self {
237 Self {
238 extractor_id: extractor_id.clone(),
239 state: SynchronizerState::Started,
240 error: None,
241 modify_ts: Local::now().naive_utc(),
242 rx,
243 }
244 }
245 async fn try_advance(
246 &mut self,
247 block_history: &BlockHistory,
248 block_time: std::time::Duration,
249 latency_buffer: std::time::Duration,
250 stale_threshold: std::time::Duration,
251 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
252 let extractor_id = self.extractor_id.clone();
253 let latest_block = block_history.latest();
254
255 match &self.state {
256 SynchronizerState::Started | SynchronizerState::Ended(_) => {
257 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
258 Ok(None)
259 }
260 SynchronizerState::Advanced(b) => {
261 let future_block = b.clone();
262 self.transition(future_block, block_history, stale_threshold)?;
264 Ok(None)
265 }
266 SynchronizerState::Ready(previous_block) => {
267 self.try_recv_next_expected(
269 block_time + latency_buffer,
270 block_history,
271 previous_block.clone(),
272 stale_threshold,
273 )
274 .await
275 }
276 SynchronizerState::Delayed(old_block) => {
277 debug!(
279 %old_block,
280 latest_block=opt(&latest_block),
281 %extractor_id,
282 "Trying to catch up to latest block"
283 );
284 self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
285 .await
286 }
287 SynchronizerState::Stale(old_block) => {
288 debug!(
290 %old_block,
291 latest_block=opt(&latest_block),
292 %extractor_id,
293 "Trying to catch up stale synchronizer to latest block"
294 );
295 self.try_catch_up(block_history, block_time, stale_threshold)
296 .await
297 }
298 }
299 }
300
301 async fn try_recv_next_expected(
309 &mut self,
310 max_wait: std::time::Duration,
311 block_history: &BlockHistory,
312 previous_block: BlockHeader,
313 stale_threshold: std::time::Duration,
314 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
315 let extractor_id = self.extractor_id.clone();
316 match timeout(max_wait, self.rx.recv()).await {
317 Ok(Some(Ok(msg))) => {
318 self.transition(msg.header.clone(), block_history, stale_threshold)?;
319 Ok(Some(msg))
320 }
321 Ok(Some(Err(e))) => {
322 self.mark_errored(e);
324 Ok(None)
325 }
326 Ok(None) => {
327 warn!(
330 %extractor_id,
331 "Tried to poll from closed synchronizer.",
332 );
333 self.mark_closed();
334 Ok(None)
335 }
336 Err(_) => {
337 debug!(%extractor_id, %previous_block, "No block received within time limit.");
339 self.state = SynchronizerState::Delayed(previous_block.clone());
342 self.modify_ts = Local::now().naive_utc();
343 Ok(None)
344 }
345 }
346 }
347
348 async fn try_catch_up(
354 &mut self,
355 block_history: &BlockHistory,
356 max_wait: std::time::Duration,
357 stale_threshold: std::time::Duration,
358 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
359 let mut results = Vec::new();
360 let extractor_id = self.extractor_id.clone();
361
362 let deadline = std::time::Instant::now() + max_wait;
364
365 while std::time::Instant::now() < deadline {
366 match timeout(
367 deadline.saturating_duration_since(std::time::Instant::now()),
368 self.rx.recv(),
369 )
370 .await
371 {
372 Ok(Some(Ok(msg))) => {
373 debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
374 let block_pos = block_history.determine_block_position(&msg.header)?;
375 results.push(msg);
376 if matches!(block_pos, BlockPosition::NextExpected | BlockPosition::NextPartial)
377 {
378 break;
379 }
380 }
381 Ok(Some(Err(e))) => {
382 self.mark_errored(e);
384 return Ok(None);
385 }
386 Ok(None) => {
387 warn!(
390 %extractor_id,
391 "Tried to poll from closed synchronizer during catch up.",
392 );
393 self.mark_closed();
394 return Ok(None);
395 }
396 Err(_) => {
397 debug!(%extractor_id, "Timed out waiting for catch-up");
398 break;
399 }
400 }
401 }
402
403 let merged = results
404 .into_iter()
405 .reduce(|l, r| l.merge(r));
406
407 if let Some(msg) = merged {
408 debug!(%extractor_id, "Delayed extractor made progress!");
410 self.transition(msg.header.clone(), block_history, stale_threshold)?;
411 Ok(Some(msg))
412 } else {
413 self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
415 Ok(None)
416 }
417 }
418
419 fn check_and_transition_to_stale_if_needed(
421 &mut self,
422 stale_threshold: std::time::Duration,
423 fallback_header: Option<BlockHeader>,
424 ) -> Result<bool, BlockSynchronizerError> {
425 let now = Local::now().naive_utc();
426 let wait_duration = now.signed_duration_since(self.modify_ts);
427 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
428 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
429
430 if wait_duration > stale_threshold_chrono {
431 let header_to_use = match (&self.state, fallback_header) {
432 (SynchronizerState::Ready(h), _) |
433 (SynchronizerState::Delayed(h), _) |
434 (SynchronizerState::Stale(h), _) => h.clone(),
435 (_, Some(h)) => h,
436 _ => BlockHeader::default(),
437 };
438
439 warn!(
440 extractor_id=%self.extractor_id,
441 last_message_at=?self.modify_ts,
442 "SynchronizerStream transition to stale due to timeout."
443 );
444 self.state = SynchronizerState::Stale(header_to_use);
445 self.modify_ts = now;
446 Ok(true)
447 } else {
448 Ok(false)
449 }
450 }
451
452 fn transition(
459 &mut self,
460 latest_retrieved: BlockHeader,
461 block_history: &BlockHistory,
462 stale_threshold: std::time::Duration,
463 ) -> Result<(), BlockSynchronizerError> {
464 let extractor_id = self.extractor_id.clone();
465 let last_message_at = self.modify_ts;
466 let block = &latest_retrieved;
467
468 match block_history.determine_block_position(&latest_retrieved)? {
469 BlockPosition::NextExpected | BlockPosition::NextPartial => {
470 self.state = SynchronizerState::Ready(latest_retrieved.clone());
471 trace!(
472 next = %latest_retrieved,
473 extractor = %extractor_id,
474 "SynchronizerStream transition to next expected"
475 )
476 }
477 BlockPosition::Latest | BlockPosition::Delayed => {
478 if !self.check_and_transition_to_stale_if_needed(
479 stale_threshold,
480 Some(latest_retrieved.clone()),
481 )? {
482 warn!(
483 %extractor_id,
484 ?last_message_at,
485 %block,
486 "SynchronizerStream transition transition to delayed."
487 );
488 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
489 }
490 }
491 BlockPosition::Advanced => {
492 info!(
493 %extractor_id,
494 ?last_message_at,
495 latest = opt(&block_history.latest()),
496 %block,
497 "SynchronizerStream transition to advanced."
498 );
499 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
500 }
501 }
502 self.modify_ts = Local::now().naive_utc();
503 Ok(())
504 }
505
506 fn mark_errored(&mut self, error: SynchronizerError) {
511 self.state = SynchronizerState::Ended(error.to_string());
512 self.modify_ts = Local::now().naive_utc();
513 self.error = Some(error);
514 }
515
516 fn mark_closed(&mut self) {
522 if !matches!(self.state, SynchronizerState::Ended(_)) {
523 self.state = SynchronizerState::Ended("Closed".to_string());
524 self.modify_ts = Local::now().naive_utc();
525 }
526 }
527
528 fn mark_stale(&mut self, header: &BlockHeader) {
530 self.state = SynchronizerState::Stale(header.clone());
531 self.modify_ts = Local::now().naive_utc();
532 }
533
534 fn mark_ready(&mut self, header: &BlockHeader) {
536 self.state = SynchronizerState::Ready(header.clone());
537 self.modify_ts = Local::now().naive_utc();
538 }
539
540 fn has_ended(&self) -> bool {
541 matches!(self.state, SynchronizerState::Ended(_))
542 }
543
544 fn is_stale(&self) -> bool {
545 matches!(self.state, SynchronizerState::Stale(_))
546 }
547
548 fn is_advanced(&self) -> bool {
549 matches!(self.state, SynchronizerState::Advanced(_))
550 }
551
552 fn get_current_header(&self) -> Option<&BlockHeader> {
556 match &self.state {
557 SynchronizerState::Ready(b) |
558 SynchronizerState::Delayed(b) |
559 SynchronizerState::Advanced(b) => Some(b),
560 _ => None,
561 }
562 }
563}
564
565#[derive(Debug, PartialEq, Clone)]
566pub struct FeedMessage<H = BlockHeader>
567where
568 H: HeaderLike,
569{
570 pub state_msgs: HashMap<String, StateSyncMessage<H>>,
571 pub sync_states: HashMap<String, SynchronizerState>,
572}
573
574impl<H> FeedMessage<H>
575where
576 H: HeaderLike,
577{
578 fn new(
579 state_msgs: HashMap<String, StateSyncMessage<H>>,
580 sync_states: HashMap<String, SynchronizerState>,
581 ) -> Self {
582 Self { state_msgs, sync_states }
583 }
584}
585
586impl<S> BlockSynchronizer<S>
587where
588 S: StateSynchronizer,
589{
590 pub fn new(
591 block_time: std::time::Duration,
592 latency_buffer: std::time::Duration,
593 max_missed_blocks: u64,
594 ) -> Self {
595 Self {
596 synchronizers: None,
597 max_messages: None,
598 block_time,
599 latency_buffer,
600 startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
601 max_missed_blocks,
602 }
603 }
604
605 pub fn max_messages(&mut self, val: usize) {
611 self.max_messages = Some(val);
612 }
613
614 pub fn startup_timeout(mut self, val: Duration) {
618 self.startup_timeout = val;
619 }
620
621 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
622 let mut registered = self.synchronizers.unwrap_or_default();
623 registered.insert(id, synchronizer);
624 self.synchronizers = Some(registered);
625 self
626 }
627
628 #[cfg(test)]
629 pub fn with_short_timeouts() -> Self {
630 Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
631 }
632
633 async fn cleanup_synchronizers(
636 mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
637 sync_close_senders: Vec<oneshot::Sender<()>>,
638 ) {
639 for close_sender in sync_close_senders {
641 let _ = close_sender.send(());
642 }
643
644 let mut completed_tasks = 0;
646 while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
647 completed_tasks += 1;
648 }
649
650 let remaining_tasks = state_sync_tasks.len();
652 if remaining_tasks > 0 {
653 warn!(
654 completed = completed_tasks,
655 timed_out = remaining_tasks,
656 "Some synchronizers timed out during cleanup and may not have shut down cleanly"
657 );
658 }
659 }
660
661 pub async fn run(
666 mut self,
667 ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
668 {
669 trace!("Starting BlockSynchronizer...");
670 let state_sync_tasks = FuturesUnordered::new();
671 let mut synchronizers = self
672 .synchronizers
673 .take()
674 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
675 let init_results = join_all(synchronizers.iter_mut().map(|(id, s)| {
678 s.initialize()
679 .map(|res| (id.clone(), res))
680 }))
681 .await;
682 let mut to_skip = Vec::new();
683 for (extractor_id, result) in init_results {
684 match result {
685 Ok(()) => {}
686 Err(SynchronizerError::RPCError(crate::rpc::RPCError::UnknownExtractor(
687 reason,
688 ))) => {
689 warn!(%extractor_id, %reason, "Extractor not recognised by server, skipping");
690 to_skip.push(extractor_id);
691 }
692 Err(e) => {
693 return Err(BlockSynchronizerError::InitializationError {
694 extractor: extractor_id,
695 source: e,
696 })
697 }
698 }
699 }
700 for id in &to_skip {
701 synchronizers.remove(id);
702 }
703 if synchronizers.is_empty() {
704 return Err(BlockSynchronizerError::NoSynchronizers);
705 }
706
707 let mut sync_streams = Vec::with_capacity(synchronizers.len());
708 let mut sync_close_senders = Vec::new();
709 for (extractor_id, synchronizer) in synchronizers.drain() {
710 let (handle, rx) = synchronizer.start().await;
711 let (join_handle, close_sender) = handle.split();
712 state_sync_tasks.push(join_handle);
713 sync_close_senders.push(close_sender);
714
715 sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
716 }
717
718 debug!("Waiting for initial synchronizer messages...");
721 let mut startup_futures = Vec::new();
722 for synchronizer in sync_streams.iter_mut() {
723 let fut = async {
724 let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
725 (synchronizer, res)
726 };
727 startup_futures.push(fut);
728 }
729
730 let mut ready_sync_msgs = HashMap::new();
731 let initial_headers = join_all(startup_futures)
732 .await
733 .into_iter()
734 .filter_map(|(synchronizer, res)| {
735 let extractor_id = synchronizer.extractor_id.clone();
736 match res {
737 Ok(Some(Ok(msg))) => {
738 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
739 synchronizer.mark_ready(&msg.header);
741 let header = msg.header.clone();
742 ready_sync_msgs.insert(extractor_id.name.clone(), msg);
743 Some(header)
744 }
745 Ok(Some(Err(e))) => {
746 synchronizer.mark_errored(e);
747 None
748 }
749 Ok(None) => {
750 warn!(%extractor_id, "Synchronizer closed during startup");
755 synchronizer.mark_closed();
756 None
757 }
758 Err(_) => {
759 warn!(%extractor_id, "Timed out waiting for first message");
761 synchronizer.mark_stale(&BlockHeader::default());
762 None
763 }
764 }
765 })
766 .collect::<HashSet<_>>() .into_iter()
768 .collect::<Vec<_>>();
769
770 Self::require_active_stream(&sync_streams)?;
772 let mut block_history = BlockHistory::new(initial_headers, 15)?;
773 let start_header = block_history
777 .latest()
778 .ok_or(BlockHistoryError::EmptyHistory)?;
779 info!(
780 start_block=%start_header,
781 n_healthy=ready_sync_msgs.len(),
782 n_total=sync_streams.len(),
783 "Block synchronization started successfully!"
784 );
785
786 for stream in sync_streams.iter_mut() {
789 if let SynchronizerState::Ready(header) = stream.state.clone() {
790 if header.number < start_header.number {
791 debug!(
792 extractor_id=%stream.extractor_id,
793 synchronizer_block=header.number,
794 current_block=start_header.number,
795 "Marking synchronizer as delayed during initialization"
796 );
797 stream.state = SynchronizerState::Delayed(header);
798 }
799 }
800 }
801
802 let (sync_tx, sync_rx) = mpsc::channel(30);
803 let main_loop_jh = tokio::spawn(async move {
804 let mut n_iter = 1;
805 loop {
806 let msg = FeedMessage::new(
808 std::mem::take(&mut ready_sync_msgs),
809 sync_streams
810 .iter()
811 .map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
812 .collect(),
813 );
814 if sync_tx.send(Ok(msg)).await.is_err() {
815 info!("Receiver closed, block synchronizer terminating..");
816 return;
817 };
818
819 if let Some(max_messages) = self.max_messages {
821 if n_iter >= max_messages {
822 info!(max_messages, "StreamEnd");
823 return;
824 }
825 }
826 n_iter += 1;
827
828 let res = self
829 .handle_next_message(
830 &mut sync_streams,
831 &mut ready_sync_msgs,
832 &mut block_history,
833 )
834 .await;
835
836 if let Err(e) = res {
837 let _ = sync_tx.send(Err(e)).await;
839 return;
840 }
841 }
842 });
843
844 let nanny_jh = tokio::spawn(async move {
849 let _ = main_loop_jh.await.map_err(|e| {
851 if e.is_panic() {
852 error!("BlockSynchornizer main loop panicked: {e}")
853 }
854 });
855 debug!("Main loop exited. Closing synchronizers");
856 Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
857 debug!("Shutdown complete");
858 });
859 Ok((nanny_jh, sync_rx))
860 }
861
862 async fn handle_next_message(
867 &self,
868 sync_streams: &mut [SynchronizerStream],
869 ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
870 block_history: &mut BlockHistory,
871 ) -> BlockSyncResult<()> {
872 let mut recv_futures = Vec::new();
873 for stream in sync_streams.iter_mut() {
874 if stream.has_ended() {
877 continue;
878 }
879 recv_futures.push(async {
889 let res = stream
890 .try_advance(
891 block_history,
892 self.block_time,
893 self.latency_buffer,
894 self.block_time
895 .mul_f64(self.max_missed_blocks as f64),
896 )
897 .await?;
898 Ok::<_, BlockSynchronizerError>(
899 res.map(|msg| (stream.extractor_id.name.clone(), msg)),
900 )
901 });
902 }
903 ready_sync_msgs.extend(
904 join_all(recv_futures)
905 .await
906 .into_iter()
907 .collect::<Result<Vec<_>, _>>()?
908 .into_iter()
909 .flatten(),
910 );
911
912 Self::check_streams(sync_streams)?;
915
916 if sync_streams
919 .iter()
920 .any(SynchronizerStream::is_advanced)
921 {
922 *block_history = Self::reinit_block_history(sync_streams, block_history)?;
923 } else if let Some(header) = sync_streams
924 .iter()
925 .filter_map(SynchronizerStream::get_current_header)
926 .max_by_key(|b| b.number)
927 {
928 block_history.push(header.clone())?;
929 }
930 Ok(())
933 }
934
935 fn reinit_block_history(
944 sync_streams: &mut [SynchronizerStream],
945 block_history: &mut BlockHistory,
946 ) -> Result<BlockHistory, BlockSynchronizerError> {
947 let previous = block_history
948 .latest()
949 .ok_or(BlockHistoryError::EmptyHistory)?;
951 let blocks = sync_streams
952 .iter()
953 .filter_map(SynchronizerStream::get_current_header)
954 .cloned()
955 .collect();
956 let new_block_history = BlockHistory::new(blocks, 10)?;
957 let latest = new_block_history
958 .latest()
959 .ok_or(BlockHistoryError::EmptyHistory)?;
961 info!(
962 %previous,
963 %latest,
964 "Advanced synchronizer detected. Reinitialized block history."
965 );
966 sync_streams
967 .iter_mut()
968 .for_each(|stream| {
969 if let Some(header) = stream.get_current_header() {
972 if header.number < latest.number {
973 stream.state = SynchronizerState::Delayed(header.clone());
974 } else if header.number == latest.number {
975 stream.state = SynchronizerState::Ready(header.clone());
976 }
977 }
978 });
979 Ok(new_block_history)
980 }
981
982 fn require_active_stream(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
987 if sync_streams
988 .iter()
989 .any(|s| !s.has_ended() && !s.is_stale())
990 {
991 return Ok(());
992 }
993 let reason: Vec<String> = sync_streams
994 .iter()
995 .map(|s| format!("{} reported as {} at {}", s.extractor_id, s.state, s.modify_ts))
996 .collect();
997 Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
998 }
999
1000 fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
1010 let mut has_any_ended = false;
1011 let mut latest_ended_stream: Option<&SynchronizerStream> = None;
1012
1013 for stream in sync_streams.iter() {
1014 if !stream.has_ended() && !stream.is_stale() {
1016 return Ok(());
1017 }
1018
1019 if stream.has_ended() {
1020 has_any_ended = true;
1021 if latest_ended_stream.is_none() ||
1022 stream.modify_ts >
1023 latest_ended_stream
1024 .as_ref()
1025 .unwrap()
1026 .modify_ts
1027 {
1028 latest_ended_stream = Some(stream);
1029 }
1030 }
1031 }
1032
1033 if !has_any_ended {
1036 return Ok(());
1037 }
1038
1039 let last_error_reason = if let Some(stream) = latest_ended_stream {
1041 if let Some(err) = &stream.error {
1042 format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
1043 } else {
1044 format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
1045 }
1046 } else {
1047 return Err(BlockSynchronizerError::NoSynchronizers);
1048 };
1049
1050 let mut reason = vec![last_error_reason];
1051
1052 sync_streams.iter().for_each(|stream| {
1053 reason.push(format!(
1054 "{} reported as {} at {}",
1055 stream.extractor_id, stream.state, stream.modify_ts
1056 ))
1057 });
1058
1059 Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
1060 }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use std::sync::Arc;
1066
1067 use async_trait::async_trait;
1068 use test_log::test;
1069 use tokio::sync::{oneshot, Mutex};
1070 use tycho_common::models::Chain;
1071
1072 use super::*;
1073 use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
1074
1075 #[derive(Clone, Debug)]
1076 enum MockBehavior {
1077 Normal, IgnoreClose, ExitImmediately, }
1081
1082 type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1083
1084 #[derive(Clone)]
1085 struct MockStateSync {
1086 header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1087 header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1088 close_received: Arc<Mutex<bool>>,
1089 behavior: MockBehavior,
1090 close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1092 }
1093
1094 impl MockStateSync {
1095 fn new() -> Self {
1096 Self::with_behavior(MockBehavior::Normal)
1097 }
1098
1099 fn with_behavior(behavior: MockBehavior) -> Self {
1100 let (tx, rx) = mpsc::channel(1);
1101 Self {
1102 header_tx: tx,
1103 header_rx: Arc::new(Mutex::new(Some(rx))),
1104 close_received: Arc::new(Mutex::new(false)),
1105 behavior,
1106 close_tx: Arc::new(Mutex::new(None)),
1107 }
1108 }
1109
1110 async fn was_close_received(&self) -> bool {
1111 *self.close_received.lock().await
1112 }
1113
1114 async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1115 self.header_tx
1116 .send(Ok(header))
1117 .await
1118 .map_err(|e| format!("sending header failed: {e}"))
1119 }
1120
1121 async fn trigger_close(&self) {
1123 if let Some(close_tx) = self.close_tx.lock().await.take() {
1124 let _ = close_tx.send(());
1125 }
1126 }
1127 }
1128
1129 #[async_trait]
1130 impl StateSynchronizer for MockStateSync {
1131 async fn initialize(&mut self) -> SyncResult<()> {
1132 Ok(())
1133 }
1134
1135 async fn start(
1136 mut self,
1137 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1138 let block_rx = {
1139 let mut guard = self.header_rx.lock().await;
1140 guard
1141 .take()
1142 .expect("Block receiver was not set!")
1143 };
1144
1145 let (close_tx_for_handle, close_rx) = oneshot::channel();
1148 let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1149
1150 {
1152 let mut guard = self.close_tx.lock().await;
1153 *guard = Some(close_tx_for_test);
1154 }
1155
1156 let behavior = self.behavior.clone();
1157 let close_received_clone = self.close_received.clone();
1158 let tx = self.header_tx.clone();
1159
1160 let jh = tokio::spawn(async move {
1161 match behavior {
1162 MockBehavior::IgnoreClose => {
1163 loop {
1166 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1167 }
1168 }
1169 MockBehavior::ExitImmediately => {
1170 tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1172 "Simulated immediate task failure".to_string(),
1173 )))
1174 .await
1175 .unwrap();
1176 }
1177 MockBehavior::Normal => {
1178 let _ = tokio::select! {
1181 result = close_rx => result,
1182 result = close_rx_for_test => result,
1183 };
1184 let mut guard = close_received_clone.lock().await;
1185 *guard = true;
1186 }
1187 }
1188 });
1189
1190 let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1191 (handle, block_rx)
1192 }
1193 }
1194
1195 fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1196 StateSyncMessage {
1197 header: BlockHeader {
1198 number: block as u64,
1199 hash: Bytes::from(vec![block]),
1200 parent_hash: Bytes::from(vec![block - 1]),
1201 revert: false,
1202 timestamp: 1000,
1203 partial_block_index: None,
1204 },
1205 ..Default::default()
1206 }
1207 }
1208
1209 fn partial_header_message(block: u8, partial_idx: u32) -> StateSyncMessage<BlockHeader> {
1212 let hash_bytes =
1214 [(block as u64).to_be_bytes().as_slice(), partial_idx.to_be_bytes().as_slice()]
1215 .concat();
1216 StateSyncMessage {
1217 header: BlockHeader {
1218 number: block as u64,
1219 hash: Bytes::from(hash_bytes),
1220 parent_hash: Bytes::from(vec![block - 1]),
1221 revert: false,
1222 timestamp: 1000,
1223 partial_block_index: Some(partial_idx),
1224 },
1225 ..Default::default()
1226 }
1227 }
1228
1229 fn revert_header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1230 StateSyncMessage {
1231 header: BlockHeader {
1232 number: block as u64,
1233 hash: Bytes::from(vec![block]),
1234 parent_hash: Bytes::from(vec![block - 1]),
1235 revert: true,
1236 timestamp: 1000,
1237 partial_block_index: None,
1238 },
1239 ..Default::default()
1240 }
1241 }
1242
1243 async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1244 timeout(Duration::from_millis(100), rx.recv())
1245 .await
1246 .expect("Responds in time")
1247 .expect("Should receive first message")
1248 .expect("No error")
1249 }
1250
1251 async fn setup_block_sync(
1252 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1253 {
1254 setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1255 }
1256
1257 async fn setup_block_sync_with_behaviour(
1259 v2_behavior: MockBehavior,
1260 v3_behavior: MockBehavior,
1261 ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1262 {
1263 let v2_sync = MockStateSync::with_behavior(v2_behavior);
1264 let v3_sync = MockStateSync::with_behavior(v3_behavior);
1265
1266 let mut block_sync = BlockSynchronizer::new(
1268 Duration::from_millis(20), Duration::from_millis(10), 3, );
1272 block_sync.max_messages(10); let block_sync = block_sync
1275 .register_synchronizer(
1276 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1277 v2_sync.clone(),
1278 )
1279 .register_synchronizer(
1280 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1281 v3_sync.clone(),
1282 );
1283
1284 let block1_msg = header_message(1);
1286 let _ = v2_sync
1287 .send_header(block1_msg.clone())
1288 .await;
1289 let _ = v3_sync
1290 .send_header(block1_msg.clone())
1291 .await;
1292
1293 let (nanny_handle, mut rx) = block_sync
1295 .run()
1296 .await
1297 .expect("BlockSynchronizer failed to start");
1298
1299 let first_feed_msg = receive_message(&mut rx).await;
1300 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1301 assert!(matches!(
1302 first_feed_msg
1303 .sync_states
1304 .get("uniswap-v2")
1305 .unwrap(),
1306 SynchronizerState::Ready(_)
1307 ));
1308 assert!(matches!(
1309 first_feed_msg
1310 .sync_states
1311 .get("uniswap-v3")
1312 .unwrap(),
1313 SynchronizerState::Ready(_)
1314 ));
1315
1316 (v2_sync, v3_sync, nanny_handle, rx)
1317 }
1318
1319 async fn shutdown_block_synchronizer(
1320 nanny_handle: JoinHandle<()>,
1321 rx: Receiver<BlockSyncResult<FeedMessage>>,
1322 ) {
1323 drop(rx);
1327 timeout(Duration::from_secs(2), nanny_handle)
1328 .await
1329 .expect("Nanny failed to exit within time")
1330 .expect("Nanny panicked");
1331 }
1332
1333 async fn send_and_assert_ready(
1335 sync: &MockStateSync,
1336 sync_name: &str,
1337 rx: &mut Receiver<BlockSyncResult<FeedMessage>>,
1338 msg: StateSyncMessage<BlockHeader>,
1339 expected_block: u64,
1340 expected_partial: Option<u32>,
1341 ) {
1342 sync.send_header(msg)
1343 .await
1344 .expect("send failed");
1345 let feed_msg = receive_message(rx).await;
1346 let state = feed_msg
1347 .sync_states
1348 .get(sync_name)
1349 .unwrap();
1350 match state {
1351 SynchronizerState::Ready(h) => {
1352 assert_eq!(h.number, expected_block, "wrong block number");
1353 assert_eq!(h.partial_block_index, expected_partial, "wrong partial index");
1354 }
1355 other => panic!("expected Ready, got {:?}", other),
1356 }
1357 }
1358
1359 #[test(tokio::test)]
1360 async fn test_two_ready_synchronizers() {
1361 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1362
1363 let second_msg = header_message(2);
1364 v2_sync
1365 .send_header(second_msg.clone())
1366 .await
1367 .expect("send_header failed");
1368 v3_sync
1369 .send_header(second_msg.clone())
1370 .await
1371 .expect("send_header failed");
1372 let second_feed_msg = receive_message(&mut rx).await;
1373
1374 let exp2 = FeedMessage {
1375 state_msgs: [
1376 ("uniswap-v2".to_string(), second_msg.clone()),
1377 ("uniswap-v3".to_string(), second_msg.clone()),
1378 ]
1379 .into_iter()
1380 .collect(),
1381 sync_states: [
1382 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1383 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1384 ]
1385 .into_iter()
1386 .collect(),
1387 };
1388 assert_eq!(second_feed_msg, exp2);
1389
1390 shutdown_block_synchronizer(nanny_handle, rx).await;
1391 }
1392
1393 #[test(tokio::test)]
1394 async fn test_delayed_synchronizer_catches_up() {
1395 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1396
1397 let block2_msg = header_message(2);
1399 v2_sync
1400 .send_header(block2_msg.clone())
1401 .await
1402 .expect("send_header failed");
1403
1404 let second_feed_msg = receive_message(&mut rx).await;
1406 debug!("Consumed second message for v2");
1407
1408 assert!(second_feed_msg
1409 .state_msgs
1410 .contains_key("uniswap-v2"));
1411 assert!(matches!(
1412 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1413 SynchronizerState::Ready(header) if header.number == 2
1414 ));
1415 assert!(!second_feed_msg
1416 .state_msgs
1417 .contains_key("uniswap-v3"));
1418 assert!(matches!(
1419 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1420 SynchronizerState::Delayed(header) if header.number == 1
1421 ));
1422
1423 v3_sync
1425 .send_header(block2_msg.clone())
1426 .await
1427 .expect("send_header failed");
1428
1429 let block3_msg = header_message(3);
1431 v2_sync
1432 .send_header(block3_msg.clone())
1433 .await
1434 .expect("send_header failed");
1435 v3_sync
1436 .send_header(block3_msg)
1437 .await
1438 .expect("send_header failed");
1439
1440 let mut third_feed_msg = receive_message(&mut rx).await;
1443
1444 if !third_feed_msg
1447 .state_msgs
1448 .contains_key("uniswap-v2")
1449 {
1450 third_feed_msg = rx
1451 .recv()
1452 .await
1453 .expect("header channel was closed")
1454 .expect("no error");
1455 }
1456 assert!(third_feed_msg
1457 .state_msgs
1458 .contains_key("uniswap-v2"));
1459 assert!(third_feed_msg
1460 .state_msgs
1461 .contains_key("uniswap-v3"));
1462 assert!(matches!(
1463 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1464 SynchronizerState::Ready(header) if header.number == 3
1465 ));
1466 assert!(matches!(
1467 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1468 SynchronizerState::Ready(header) if header.number == 3
1469 ));
1470
1471 shutdown_block_synchronizer(nanny_handle, rx).await;
1472 }
1473
1474 #[test(tokio::test)]
1475 async fn test_different_start_blocks() {
1476 let v2_sync = MockStateSync::new();
1477 let v3_sync = MockStateSync::new();
1478 let block_sync = BlockSynchronizer::with_short_timeouts()
1479 .register_synchronizer(
1480 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1481 v2_sync.clone(),
1482 )
1483 .register_synchronizer(
1484 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1485 v3_sync.clone(),
1486 );
1487
1488 let block1_msg = header_message(1);
1490 let block2_msg = header_message(2);
1491
1492 let _ = v2_sync
1493 .send_header(block1_msg.clone())
1494 .await;
1495 v3_sync
1496 .send_header(block2_msg.clone())
1497 .await
1498 .expect("send_header failed");
1499
1500 let (jh, mut rx) = block_sync
1502 .run()
1503 .await
1504 .expect("BlockSynchronizer failed to start.");
1505
1506 let first_feed_msg = receive_message(&mut rx).await;
1508 assert!(matches!(
1509 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1510 SynchronizerState::Delayed(header) if header.number == 1
1511 ));
1512 assert!(matches!(
1513 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1514 SynchronizerState::Ready(header) if header.number == 2
1515 ));
1516
1517 v2_sync
1519 .send_header(block2_msg.clone())
1520 .await
1521 .expect("send_header failed");
1522
1523 let block3_msg = header_message(3);
1525 let _ = v2_sync
1526 .send_header(block3_msg.clone())
1527 .await;
1528 v3_sync
1529 .send_header(block3_msg.clone())
1530 .await
1531 .expect("send_header failed");
1532
1533 let second_feed_msg = receive_message(&mut rx).await;
1535 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1536 assert!(matches!(
1537 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1538 SynchronizerState::Ready(header) if header.number == 3
1539 ));
1540 assert!(matches!(
1541 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1542 SynchronizerState::Ready(header) if header.number == 3
1543 ));
1544
1545 shutdown_block_synchronizer(jh, rx).await;
1546 }
1547
1548 #[test(tokio::test)]
1549 async fn test_synchronizer_fails_other_goes_stale() {
1550 let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1551 setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1552 .await;
1553
1554 let mut error_reported = false;
1555 for _ in 0..3 {
1556 if let Some(msg) = sync_rx.recv().await {
1557 match msg {
1558 Err(_) => error_reported = true,
1559 Ok(msg) => {
1560 assert!(matches!(
1561 msg.sync_states
1562 .get("uniswap-v3")
1563 .unwrap(),
1564 SynchronizerState::Delayed(_)
1565 ));
1566 assert!(matches!(
1567 msg.sync_states
1568 .get("uniswap-v2")
1569 .unwrap(),
1570 SynchronizerState::Ended(_)
1571 ));
1572 }
1573 }
1574 }
1575 }
1576 assert!(error_reported, "BlockSynchronizer did not report final error");
1577
1578 let result = timeout(Duration::from_secs(2), nanny_handle).await;
1580 assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1581
1582 assert!(
1584 v3_sync.was_close_received().await,
1585 "v3_sync should have received close signal during cleanup"
1586 );
1587 }
1588
1589 #[test(tokio::test)]
1590 async fn test_cleanup_timeout_warning() {
1591 let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1594 MockBehavior::ExitImmediately,
1595 MockBehavior::IgnoreClose,
1596 )
1597 .await;
1598
1599 let result = timeout(Duration::from_secs(10), nanny_handle).await;
1601 assert!(
1602 result.is_ok(),
1603 "Nanny should complete even when some synchronizers timeout during cleanup"
1604 );
1605
1606 }
1610
1611 #[test(tokio::test)]
1612 async fn test_one_synchronizer_goes_stale_while_other_works() {
1613 let (_v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1615
1616 let block2_msg = header_message(2);
1618 let _ = v3_sync
1619 .send_header(block2_msg.clone())
1620 .await;
1621 let second_feed_msg = receive_message(&mut rx).await;
1625 assert!(second_feed_msg
1626 .state_msgs
1627 .contains_key("uniswap-v3"));
1628 assert!(!second_feed_msg
1629 .state_msgs
1630 .contains_key("uniswap-v2"));
1631 assert!(matches!(
1632 second_feed_msg
1633 .sync_states
1634 .get("uniswap-v3")
1635 .unwrap(),
1636 SynchronizerState::Ready(_)
1637 ));
1638 if let Some(v2_state) = second_feed_msg
1640 .sync_states
1641 .get("uniswap-v2")
1642 {
1643 if matches!(v2_state, SynchronizerState::Delayed(_)) {
1644 assert!(
1646 !nanny_handle.is_finished(),
1647 "Nanny should still be running when synchronizer is delayed (not stale yet)"
1648 );
1649 }
1650 }
1651
1652 tokio::time::sleep(Duration::from_millis(15)).await;
1654
1655 let block3_msg = header_message(3);
1657 let _ = v3_sync
1658 .send_header(block3_msg.clone())
1659 .await;
1660
1661 tokio::time::sleep(Duration::from_millis(40)).await;
1662
1663 let mut stale_found = false;
1664 for _ in 0..2 {
1665 if let Some(Ok(msg)) = rx.recv().await {
1666 if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1667 stale_found = true;
1668 }
1669 }
1670 }
1671 assert!(stale_found, "v2 synchronizer should be stale");
1672
1673 shutdown_block_synchronizer(nanny_handle, rx).await;
1674 }
1675
1676 #[test(tokio::test)]
1677 async fn test_all_synchronizers_stale_loop_continues() {
1678 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1681
1682 let mut seen_delayed = false;
1684 let mut seen_stale = false;
1685 let start_time = tokio::time::Instant::now();
1686
1687 while let Ok(Some(Ok(msg))) =
1688 tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1689 {
1690 let v2_state = msg.sync_states.get("uniswap-v2");
1691 let v3_state = msg.sync_states.get("uniswap-v3");
1692
1693 if !seen_delayed &&
1694 (matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1695 matches!(v3_state, Some(SynchronizerState::Delayed(_))))
1696 {
1697 seen_delayed = true;
1698 assert!(
1699 !nanny_handle.is_finished(),
1700 "Nanny must still run when synchronizers are Delayed"
1701 );
1702 }
1703
1704 if matches!(v2_state, Some(SynchronizerState::Stale(_))) &&
1705 matches!(v3_state, Some(SynchronizerState::Stale(_)))
1706 {
1707 seen_stale = true;
1708 assert!(
1709 !nanny_handle.is_finished(),
1710 "Main loop must not exit when all synchronizers are Stale (awaiting recovery)"
1711 );
1712 break;
1713 }
1714
1715 if start_time.elapsed() > Duration::from_millis(500) {
1716 break;
1717 }
1718 }
1719
1720 assert!(seen_delayed, "Synchronizers should transition through Delayed first");
1721 assert!(seen_stale, "Both synchronizers should reach Stale state");
1722
1723 v2_sync.trigger_close().await;
1728 v3_sync.trigger_close().await;
1729 drop(v2_sync);
1730 drop(v3_sync);
1731
1732 let mut error_reported = false;
1734 while let Some(msg) = rx.recv().await {
1735 if msg.is_err() {
1736 error_reported = true;
1737 }
1738 }
1739 assert!(error_reported, "Expected an error after all synchronizers ended");
1740
1741 let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1742 assert!(nanny_result.is_ok(), "Nanny should complete after all synchronizers ended");
1743 }
1744
1745 #[test(tokio::test)]
1746 async fn test_all_synchronizers_recover_after_going_stale() {
1747 let v2_sync = MockStateSync::new();
1750 let v3_sync = MockStateSync::new();
1751 let block_sync =
1752 BlockSynchronizer::new(Duration::from_millis(20), Duration::from_millis(10), 3)
1753 .register_synchronizer(
1754 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1755 v2_sync.clone(),
1756 )
1757 .register_synchronizer(
1758 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1759 v3_sync.clone(),
1760 );
1761
1762 v2_sync
1763 .send_header(header_message(1))
1764 .await
1765 .unwrap();
1766 v3_sync
1767 .send_header(header_message(1))
1768 .await
1769 .unwrap();
1770
1771 let (nanny_handle, mut rx) = block_sync
1772 .run()
1773 .await
1774 .expect("BlockSynchronizer start failed");
1775
1776 let first_msg = receive_message(&mut rx).await;
1777 assert!(matches!(
1778 first_msg.sync_states.get("uniswap-v2").unwrap(),
1779 SynchronizerState::Ready(h) if h.number == 1
1780 ));
1781 assert!(matches!(
1782 first_msg.sync_states.get("uniswap-v3").unwrap(),
1783 SynchronizerState::Ready(h) if h.number == 1
1784 ));
1785
1786 let mut seen_stale = false;
1788 let start_time = tokio::time::Instant::now();
1789 while let Ok(Some(Ok(msg))) =
1790 tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1791 {
1792 let v2 = msg
1793 .sync_states
1794 .get("uniswap-v2")
1795 .unwrap();
1796 let v3 = msg
1797 .sync_states
1798 .get("uniswap-v3")
1799 .unwrap();
1800 if matches!(v2, SynchronizerState::Stale(_)) &&
1801 matches!(v3, SynchronizerState::Stale(_))
1802 {
1803 seen_stale = true;
1804 assert!(
1805 !nanny_handle.is_finished(),
1806 "Main loop must not exit while synchronizers are Stale"
1807 );
1808 break;
1809 }
1810 if start_time.elapsed() > Duration::from_millis(500) {
1811 break;
1812 }
1813 }
1814 assert!(seen_stale, "Both synchronizers should go Stale");
1815
1816 v2_sync
1818 .send_header(header_message(5))
1819 .await
1820 .unwrap();
1821 v3_sync
1822 .send_header(header_message(5))
1823 .await
1824 .unwrap();
1825
1826 let mut recovered = false;
1829 for _ in 0..20 {
1830 let msg = receive_message(&mut rx).await;
1831 let v2 = msg
1832 .sync_states
1833 .get("uniswap-v2")
1834 .unwrap();
1835 let v3 = msg
1836 .sync_states
1837 .get("uniswap-v3")
1838 .unwrap();
1839 if matches!(v2, SynchronizerState::Ready(h) if h.number == 5) &&
1840 matches!(v3, SynchronizerState::Ready(h) if h.number == 5)
1841 {
1842 recovered = true;
1843 break;
1844 }
1845 }
1846 assert!(recovered, "Both synchronizers should recover to Ready at block 5");
1847
1848 drop(rx);
1850 timeout(Duration::from_secs(2), nanny_handle)
1851 .await
1852 .expect("Nanny timed out")
1853 .expect("Nanny panicked");
1854 }
1855
1856 #[test(tokio::test)]
1857 async fn test_stale_synchronizer_recovers() {
1858 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1860
1861 tokio::time::sleep(Duration::from_millis(50)).await;
1863 let block2_msg = header_message(2);
1864 let _ = v2_sync
1865 .send_header(block2_msg.clone())
1866 .await;
1867
1868 for _ in 0..2 {
1870 if let Some(msg) = rx.recv().await {
1871 if let Ok(msg) = msg {
1872 if matches!(
1873 msg.sync_states
1874 .get("uniswap-v2")
1875 .unwrap(),
1876 SynchronizerState::Ready(_)
1877 ) {
1878 assert!(matches!(
1879 msg.sync_states
1880 .get("uniswap-v3")
1881 .unwrap(),
1882 SynchronizerState::Delayed(_)
1883 ));
1884 break;
1885 };
1886 }
1887 } else {
1888 panic!("Channel closed unexpectedly")
1889 }
1890 }
1891
1892 tokio::time::sleep(Duration::from_millis(15)).await;
1894 let block3_msg = header_message(3);
1895 let _ = v2_sync
1896 .send_header(block3_msg.clone())
1897 .await;
1898 let third_msg = receive_message(&mut rx).await;
1899 dbg!(&third_msg);
1900 assert!(matches!(
1901 third_msg
1902 .sync_states
1903 .get("uniswap-v2")
1904 .unwrap(),
1905 SynchronizerState::Ready(_)
1906 ));
1907 assert!(matches!(
1908 third_msg
1909 .sync_states
1910 .get("uniswap-v3")
1911 .unwrap(),
1912 SynchronizerState::Stale(_)
1913 ));
1914
1915 let block4_msg = header_message(4);
1916 let _ = v3_sync
1917 .send_header(block2_msg.clone())
1918 .await;
1919 let _ = v3_sync
1920 .send_header(block3_msg.clone())
1921 .await;
1922 let _ = v3_sync
1923 .send_header(block4_msg.clone())
1924 .await;
1925 let _ = v2_sync
1926 .send_header(block4_msg.clone())
1927 .await;
1928 let fourth_msg = receive_message(&mut rx).await;
1929 assert!(matches!(
1930 fourth_msg
1931 .sync_states
1932 .get("uniswap-v2")
1933 .unwrap(),
1934 SynchronizerState::Ready(_)
1935 ));
1936 assert!(matches!(
1937 fourth_msg
1938 .sync_states
1939 .get("uniswap-v3")
1940 .unwrap(),
1941 SynchronizerState::Ready(_)
1942 ));
1943
1944 shutdown_block_synchronizer(nanny_handle, rx).await;
1945
1946 assert!(
1948 v2_sync.was_close_received().await,
1949 "v2_sync should have received close signal during cleanup"
1950 );
1951 assert!(
1952 v3_sync.was_close_received().await,
1953 "v3_sync should have received close signal during cleanup"
1954 );
1955 }
1956
1957 #[test(tokio::test)]
1958 async fn test_all_synchronizer_advanced() {
1959 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1963
1964 let block3 = header_message(3);
1965 v2_sync
1966 .send_header(block3.clone())
1967 .await
1968 .unwrap();
1969 v3_sync
1970 .send_header(block3)
1971 .await
1972 .unwrap();
1973
1974 let msg = receive_message(&mut rx).await;
1975 matches!(
1976 msg.sync_states
1977 .get("uniswap-v2")
1978 .unwrap(),
1979 SynchronizerState::Ready(_)
1980 );
1981 matches!(
1982 msg.sync_states
1983 .get("uniswap-v3")
1984 .unwrap(),
1985 SynchronizerState::Ready(_)
1986 );
1987
1988 shutdown_block_synchronizer(nanny_handle, rx).await;
1989 }
1990
1991 #[test(tokio::test)]
1992 async fn test_one_synchronizer_advanced() {
1993 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1994
1995 let block2 = header_message(2);
1996 let block4 = header_message(4);
1997 v2_sync
1998 .send_header(block4.clone())
1999 .await
2000 .unwrap();
2001 v3_sync
2002 .send_header(block2.clone())
2003 .await
2004 .unwrap();
2005
2006 let msg = receive_message(&mut rx).await;
2007 matches!(
2008 msg.sync_states
2009 .get("uniswap-v2")
2010 .unwrap(),
2011 SynchronizerState::Ready(_)
2012 );
2013 matches!(
2014 msg.sync_states
2015 .get("uniswap-v3")
2016 .unwrap(),
2017 SynchronizerState::Delayed(_)
2018 );
2019
2020 shutdown_block_synchronizer(nanny_handle, rx).await;
2021 }
2022
2023 #[test(tokio::test)]
2024 async fn test_partial_blocks_normal_operation() {
2025 let (v2_sync, _v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
2028
2029 send_and_assert_ready(
2031 &v2_sync,
2032 "uniswap-v2",
2033 &mut rx,
2034 partial_header_message(2, 0),
2035 2,
2036 Some(0),
2037 )
2038 .await;
2039 send_and_assert_ready(
2040 &v2_sync,
2041 "uniswap-v2",
2042 &mut rx,
2043 partial_header_message(2, 3),
2044 2,
2045 Some(3),
2046 )
2047 .await;
2048 send_and_assert_ready(
2049 &v2_sync,
2050 "uniswap-v2",
2051 &mut rx,
2052 partial_header_message(2, 7),
2053 2,
2054 Some(7),
2055 )
2056 .await;
2057
2058 send_and_assert_ready(
2060 &v2_sync,
2061 "uniswap-v2",
2062 &mut rx,
2063 partial_header_message(3, 0),
2064 3,
2065 Some(0),
2066 )
2067 .await;
2068 send_and_assert_ready(
2069 &v2_sync,
2070 "uniswap-v2",
2071 &mut rx,
2072 partial_header_message(3, 2),
2073 3,
2074 Some(2),
2075 )
2076 .await;
2077
2078 shutdown_block_synchronizer(nanny_handle, rx).await;
2079 }
2080
2081 #[test(tokio::test)]
2082 async fn test_partial_blocks_handles_reverts() {
2083 let (v2_sync, _v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
2086
2087 send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(2), 2, None).await;
2089
2090 send_and_assert_ready(
2092 &v2_sync,
2093 "uniswap-v2",
2094 &mut rx,
2095 partial_header_message(3, 0),
2096 3,
2097 Some(0),
2098 )
2099 .await;
2100 send_and_assert_ready(
2101 &v2_sync,
2102 "uniswap-v2",
2103 &mut rx,
2104 partial_header_message(3, 2),
2105 3,
2106 Some(2),
2107 )
2108 .await;
2109
2110 send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, revert_header_message(2), 2, None)
2112 .await;
2113
2114 send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(3), 3, None).await;
2116
2117 shutdown_block_synchronizer(nanny_handle, rx).await;
2118 }
2119
2120 #[test(tokio::test)]
2121 async fn test_partial_blocks_delayed_synchronizer_catches_up() {
2122 let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
2125
2126 let partial_0 = partial_header_message(2, 0);
2128 v2_sync
2129 .send_header(partial_0.clone())
2130 .await
2131 .expect("send partial 0 failed");
2132
2133 let msg = receive_message(&mut rx).await;
2134 assert!(msg
2136 .state_msgs
2137 .contains_key("uniswap-v2"));
2138 assert!(!msg
2139 .state_msgs
2140 .contains_key("uniswap-v3"));
2141 assert!(matches!(
2142 msg.sync_states.get("uniswap-v2").unwrap(),
2143 SynchronizerState::Ready(h) if h.partial_block_index == Some(0)
2144 ));
2145 assert!(matches!(
2146 msg.sync_states
2147 .get("uniswap-v3")
2148 .unwrap(),
2149 SynchronizerState::Delayed(_)
2150 ));
2151
2152 let partial_2 = partial_header_message(2, 2);
2154 v2_sync
2155 .send_header(partial_2.clone())
2156 .await
2157 .expect("send partial 2 failed");
2158 v3_sync
2159 .send_header(partial_0.clone())
2160 .await
2161 .expect("v3 catch up partial 0 failed");
2162 v3_sync
2163 .send_header(partial_2.clone())
2164 .await
2165 .expect("v3 catch up partial 2 failed");
2166
2167 let mut v3_ready = false;
2169 for _ in 0..3 {
2170 let msg = receive_message(&mut rx).await;
2171 if matches!(
2172 msg.sync_states.get("uniswap-v3").unwrap(),
2173 SynchronizerState::Ready(h) if h.partial_block_index == Some(2)
2174 ) {
2175 v3_ready = true;
2176 break;
2177 }
2178 }
2179 assert!(v3_ready, "v3 caught up to partial 2");
2180
2181 shutdown_block_synchronizer(nanny_handle, rx).await;
2182 }
2183}