1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5 future::{join_all, try_join_all},
6 stream::FuturesUnordered,
7 StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12 select,
13 sync::mpsc::{self, Receiver},
14 task::JoinHandle,
15 time::timeout,
16};
17use tracing::{debug, error, info, trace, warn};
18use tycho_common::{
19 dto::{Block, ExtractorIdentity},
20 Bytes,
21};
22
23use crate::feed::{
24 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
25 synchronizer::{StateSyncMessage, StateSynchronizer, SynchronizerError},
26};
27
28mod block_history;
29pub mod component_tracker;
30pub mod synchronizer;
31
32pub trait HeaderLike {
37 fn block(self) -> Option<BlockHeader>;
38 fn block_number_or_timestamp(self) -> u64;
39}
40
41#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
42pub struct BlockHeader {
43 pub hash: Bytes,
44 pub number: u64,
45 pub parent_hash: Bytes,
46 pub revert: bool,
47 pub timestamp: u64,
48}
49
50impl BlockHeader {
51 fn from_block(block: &Block, revert: bool) -> Self {
52 Self {
53 hash: block.hash.clone(),
54 number: block.number,
55 parent_hash: block.parent_hash.clone(),
56 revert,
57 timestamp: block.ts.timestamp() as u64,
58 }
59 }
60}
61
62impl HeaderLike for BlockHeader {
63 fn block(self) -> Option<BlockHeader> {
64 Some(self)
65 }
66
67 fn block_number_or_timestamp(self) -> u64 {
68 self.number
69 }
70}
71
72#[derive(Error, Debug)]
73pub enum BlockSynchronizerError {
74 #[error("Failed to initialize synchronizer: {0}")]
75 InitializationError(#[from] SynchronizerError),
76
77 #[error("Failed to process new block: {0}")]
78 BlockHistoryError(#[from] BlockHistoryError),
79
80 #[error("Not a single synchronizer was ready")]
81 NoReadySynchronizers,
82
83 #[error("No synchronizers were set")]
84 NoSynchronizers,
85
86 #[error("Failed to convert duration: {0}")]
87 DurationConversionError(String),
88}
89
90type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
91
92pub struct BlockSynchronizer<S> {
139 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
140 block_time: std::time::Duration,
141 max_wait: std::time::Duration,
142 max_messages: Option<usize>,
143 max_missed_blocks: u64,
144}
145
146#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
147#[serde(tag = "status", rename_all = "lowercase")]
148pub enum SynchronizerState {
149 Started,
150 Ready(BlockHeader),
151 Stale(BlockHeader),
153 Delayed(BlockHeader),
154 Advanced(BlockHeader),
158 Ended,
159}
160
161pub struct SynchronizerStream {
162 extractor_id: ExtractorIdentity,
163 state: SynchronizerState,
164 modify_ts: NaiveDateTime,
165 rx: Receiver<StateSyncMessage<BlockHeader>>,
166}
167
168impl SynchronizerStream {
169 async fn try_advance(
170 &mut self,
171 block_history: &BlockHistory,
172 max_wait: std::time::Duration,
173 stale_threshold: std::time::Duration,
174 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
175 let extractor_id = self.extractor_id.clone();
176 let latest_block = block_history.latest();
177 match &self.state {
178 SynchronizerState::Started | SynchronizerState::Ended => {
179 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
180 Ok(None)
181 }
182 SynchronizerState::Advanced(b) => {
183 let future_block = b.clone();
184 self.transition(future_block, block_history, stale_threshold)?;
186 Ok(None)
187 }
188 SynchronizerState::Ready(previous_block) => {
189 self.try_recv_next_expected(
191 max_wait,
192 block_history,
193 previous_block.clone(),
194 stale_threshold,
195 )
196 .await
197 }
199 SynchronizerState::Delayed(old_block) => {
200 debug!(
202 ?old_block,
203 ?latest_block,
204 %extractor_id,
205 "Trying to catch up to latest block"
206 );
207 self.try_catch_up(block_history, max_wait, stale_threshold)
208 .await
209 }
210 SynchronizerState::Stale(old_block) => {
211 debug!(
212 ?old_block,
213 ?latest_block,
214 %extractor_id,
215 "Trying to catch up to latest block"
216 );
217 self.try_catch_up(block_history, max_wait, stale_threshold)
218 .await
219 }
220 }
221 }
222
223 async fn try_recv_next_expected(
228 &mut self,
229 max_wait: std::time::Duration,
230 block_history: &BlockHistory,
231 previous_block: BlockHeader,
232 stale_threshold: std::time::Duration,
233 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
234 let extractor_id = self.extractor_id.clone();
235 match timeout(max_wait, self.rx.recv()).await {
236 Ok(Some(msg)) => {
237 self.transition(msg.header.clone(), block_history, stale_threshold)?;
238 Ok(Some(msg))
239 }
240 Ok(None) => {
241 error!(
242 %extractor_id,
243 ?previous_block,
244 "Extractor terminated: channel closed!"
245 );
246 self.state = SynchronizerState::Ended;
247 self.modify_ts = Local::now().naive_utc();
248 Ok(None)
249 }
250 Err(_) => {
251 debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
253 self.state = SynchronizerState::Delayed(previous_block.clone());
254 Ok(None)
255 }
256 }
257 }
258
259 async fn try_catch_up(
265 &mut self,
266 block_history: &BlockHistory,
267 max_wait: std::time::Duration,
268 stale_threshold: std::time::Duration,
269 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
270 let mut results = Vec::new();
271 let extractor_id = self.extractor_id.clone();
272
273 let deadline = std::time::Instant::now() + max_wait;
275
276 while std::time::Instant::now() < deadline {
277 match timeout(
278 deadline.saturating_duration_since(std::time::Instant::now()),
279 self.rx.recv(),
280 )
281 .await
282 {
283 Ok(Some(msg)) => {
284 debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
285 let block_pos = block_history.determine_block_position(&msg.header)?;
286 results.push(msg);
287 if matches!(block_pos, BlockPosition::NextExpected) {
288 break;
289 }
290 }
291 Ok(None) => {
292 warn!(%extractor_id, "Channel closed during catch-up");
293 self.state = SynchronizerState::Ended;
294 return Ok(None);
295 }
296 Err(_) => {
297 debug!(%extractor_id, "Timed out waiting for catch-up");
298 break;
299 }
300 }
301 }
302
303 let merged = results
304 .into_iter()
305 .reduce(|l, r| l.merge(r));
306
307 if let Some(msg) = merged {
308 debug!(?extractor_id, "Delayed extractor made progress!");
310 self.transition(msg.header.clone(), block_history, stale_threshold)?;
311 Ok(Some(msg))
312 } else {
313 Ok(None)
314 }
315 }
316
317 fn transition(
324 &mut self,
325 latest_retrieved: BlockHeader,
326 block_history: &BlockHistory,
327 stale_threshold: std::time::Duration,
328 ) -> Result<(), BlockSynchronizerError> {
329 let extractor_id = self.extractor_id.clone();
330 let last_message_at = self.modify_ts;
331 let block = &latest_retrieved;
332
333 match block_history.determine_block_position(&latest_retrieved)? {
334 BlockPosition::NextExpected => {
335 self.state = SynchronizerState::Ready(latest_retrieved.clone());
336 }
337 BlockPosition::Latest | BlockPosition::Delayed => {
338 let now = Local::now().naive_utc();
339 let wait_duration = self
340 .modify_ts
341 .signed_duration_since(now);
342 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
343 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
344 if wait_duration > stale_threshold_chrono {
345 warn!(
346 ?extractor_id,
347 ?last_message_at,
348 ?block,
349 "Extractor transition to stale."
350 );
351 self.state = SynchronizerState::Stale(latest_retrieved.clone());
352 } else {
353 warn!(
354 ?extractor_id,
355 ?last_message_at,
356 ?block,
357 "Extractor transition to delayed."
358 );
359 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
360 }
361 }
362 BlockPosition::Advanced => {
363 error!(
364 ?extractor_id,
365 ?last_message_at,
366 ?block,
367 "Extractor transition to advanced."
368 );
369 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
370 }
371 }
372 self.modify_ts = Local::now().naive_utc();
373 Ok(())
374 }
375}
376
377#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
378pub struct FeedMessage<H>
379where
380 H: HeaderLike,
381{
382 pub state_msgs: HashMap<String, StateSyncMessage<H>>,
383 pub sync_states: HashMap<String, SynchronizerState>,
384}
385
386impl<H> FeedMessage<H>
387where
388 H: HeaderLike,
389{
390 fn new(
391 state_msgs: HashMap<String, StateSyncMessage<H>>,
392 sync_states: HashMap<String, SynchronizerState>,
393 ) -> Self {
394 Self { state_msgs, sync_states }
395 }
396}
397
398impl<S> BlockSynchronizer<S>
399where
400 S: StateSynchronizer,
401{
402 pub fn new(
403 block_time: std::time::Duration,
404 max_wait: std::time::Duration,
405 max_missed_blocks: u64,
406 ) -> Self {
407 Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
408 }
409
410 pub fn max_messages(&mut self, val: usize) {
411 self.max_messages = Some(val);
412 }
413
414 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
415 let mut registered = self.synchronizers.unwrap_or_default();
416 registered.insert(id, synchronizer);
417 self.synchronizers = Some(registered);
418 self
419 }
420 pub async fn run(
421 mut self,
422 ) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>)> {
423 trace!("Starting BlockSynchronizer...");
424 let mut state_sync_tasks = FuturesUnordered::new();
425 let mut synchronizers = self
426 .synchronizers
427 .take()
428 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
429 let init_tasks = synchronizers
431 .values()
432 .map(|s| s.initialize())
433 .collect::<Vec<_>>();
434 try_join_all(init_tasks).await?;
435
436 let mut sync_streams = HashMap::with_capacity(synchronizers.len());
437 let mut sync_handles = Vec::new();
438 for (extractor_id, synchronizer) in synchronizers.drain() {
439 let (jh, rx) = synchronizer.start().await?;
440 state_sync_tasks.push(jh);
441 sync_handles.push(synchronizer);
442
443 sync_streams.insert(
444 extractor_id.clone(),
445 SynchronizerStream {
446 extractor_id,
447 state: SynchronizerState::Started,
448 modify_ts: Local::now().naive_utc(),
449 rx,
450 },
451 );
452 }
453
454 debug!("Waiting for initial synchronizer messages...");
457 let mut startup_futures = Vec::new();
458 for (id, sh) in sync_streams.iter_mut() {
459 let fut = async {
460 let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
461 (id.clone(), res)
462 };
463 startup_futures.push(fut);
464 }
465 let mut ready_sync_msgs = HashMap::new();
466 let initial_headers = join_all(startup_futures)
467 .await
468 .into_iter()
469 .filter_map(|(extractor_id, res)| {
470 let synchronizer = sync_streams
471 .get_mut(&extractor_id)
472 .unwrap();
473 match res {
474 Ok(Some(msg)) => {
475 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
476 synchronizer.state = SynchronizerState::Ready(msg.header.clone());
478 synchronizer.modify_ts = Local::now().naive_utc();
479 ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
480 Some(msg.header)
481 }
482 Ok(None) => {
483 warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
484 synchronizer.state = SynchronizerState::Ended;
485 synchronizer.modify_ts = Local::now().naive_utc();
486 None
487 }
488 Err(_) => {
489 warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
490 synchronizer.state = SynchronizerState::Stale(BlockHeader::default());
491 synchronizer.modify_ts = Local::now().naive_utc();
492 None
493 }
494 }
495 })
496 .collect::<HashSet<_>>() .into_iter()
498 .collect::<Vec<_>>();
499
500 let mut block_history = BlockHistory::new(initial_headers, 15)?;
501
502 let start_header = block_history
504 .latest()
505 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
506 info!(
507 start_block=?start_header,
508 n_healthy=?ready_sync_msgs.len(),
509 "Block synchronization started successfully!"
510 );
511
512 sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
517
518 for (_, stream) in sync_streams.iter_mut() {
521 if let SynchronizerState::Ready(header) = &stream.state.clone() {
522 if header.number < start_header.number {
523 stream.state = SynchronizerState::Delayed(header.clone());
524 debug!(
525 extractor_id=%stream.extractor_id,
526 synchronizer_block=?header.number,
527 current_block=?start_header.number,
528 "Marking synchronizer as delayed during initialization"
529 );
530 }
531 }
532 }
533
534 let (sync_tx, sync_rx) = mpsc::channel(30);
535 let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
536 let mut n_iter = 1;
537 loop {
538 sync_tx
540 .send(FeedMessage::new(
541 std::mem::take(&mut ready_sync_msgs),
542 sync_streams
543 .iter()
544 .map(|(a, b)| (a.name.to_string(), b.state.clone()))
545 .collect(),
546 ))
547 .await?;
548
549 if let Some(max_messages) = self.max_messages {
551 if n_iter >= max_messages {
552 info!(max_messages, "StreamEnd");
553 return Ok(());
554 }
555 }
556 n_iter += 1;
557
558 let mut recv_futures = Vec::new();
568 for (extractor_id, sh) in sync_streams.iter_mut() {
569 recv_futures.push(async {
570 let res = sh
571 .try_advance(
572 &block_history,
573 self.block_time + self.max_wait,
574 self.block_time
575 .mul_f64(self.max_missed_blocks as f64),
576 )
577 .await?;
578 Ok::<_, BlockSynchronizerError>(
579 res.map(|msg| (extractor_id.name.clone(), msg)),
580 )
581 });
582 }
583 ready_sync_msgs.extend(
584 join_all(recv_futures)
585 .await
586 .into_iter()
587 .collect::<Result<Vec<_>, _>>()?
588 .into_iter()
589 .flatten(),
590 );
591
592 sync_streams.retain(|_, v| match v.state {
595 SynchronizerState::Started | SynchronizerState::Ended => false,
596 SynchronizerState::Stale(_) => false,
597 SynchronizerState::Ready(_) => true,
598 SynchronizerState::Delayed(_) => true,
599 SynchronizerState::Advanced(_) => true,
600 });
601
602 block_history.push(
603 sync_streams
604 .values()
605 .filter_map(|v| match &v.state {
606 SynchronizerState::Ready(b) => Some(b),
607 _ => None,
608 })
609 .next()
610 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?
612 .clone(),
613 )?;
614 }
615 });
616
617 let nanny_jh = tokio::spawn(async move {
618 select! {
619 error = state_sync_tasks.select_next_some() => {
620 for s in sync_handles.iter_mut() {
621 if let Err(e) = s.close().await {
622 error!(error=?e, "Failed to close synchronizer: was not started!");
623 }
624 }
625 error!(?error, "State synchronizer exited");
626 },
627 error = main_loop_jh => {
628 error!(?error, "Feed main loop exited");
629 }
630 }
631 });
632 Ok((nanny_jh, sync_rx))
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use std::sync::Arc;
639
640 use async_trait::async_trait;
641 use test_log::test;
642 use tokio::sync::{oneshot, Mutex};
643 use tycho_common::dto::Chain;
644
645 use super::{synchronizer::SynchronizerError, *};
646 use crate::feed::synchronizer::SyncResult;
647
648 #[derive(Clone)]
649 struct MockStateSync {
650 header_tx: mpsc::Sender<StateSyncMessage<BlockHeader>>,
651 header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage<BlockHeader>>>>>,
652 end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
653 }
654
655 impl MockStateSync {
656 fn new() -> Self {
657 let (tx, rx) = mpsc::channel(1);
658 Self {
659 header_tx: tx,
660 header_rx: Arc::new(Mutex::new(Some(rx))),
661 end_tx: Arc::new(Mutex::new(None)),
662 }
663 }
664 async fn send_header(&self, header: StateSyncMessage<BlockHeader>) {
665 self.header_tx
666 .send(header)
667 .await
668 .expect("sending header failed");
669 }
670 }
671
672 #[async_trait]
673 impl StateSynchronizer for MockStateSync {
674 async fn initialize(&self) -> SyncResult<()> {
675 Ok(())
676 }
677
678 async fn start(
679 &self,
680 ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage<BlockHeader>>)>
681 {
682 let block_rx = {
683 let mut guard = self.header_rx.lock().await;
684 guard
685 .take()
686 .expect("Block receiver was not set!")
687 };
688
689 let end_rx = {
690 let (end_tx, end_rx) = oneshot::channel();
691 let mut guard = self.end_tx.lock().await;
692 *guard = Some(end_tx);
693 end_rx
694 };
695
696 let jh = tokio::spawn(async move {
697 let _ = end_rx.await;
698 SyncResult::Ok(())
699 });
700
701 Ok((jh, block_rx))
702 }
703
704 async fn close(&mut self) -> SyncResult<()> {
705 let mut guard = self.end_tx.lock().await;
706 if let Some(tx) = guard.take() {
707 tx.send(())
708 .expect("end channel closed!");
709 Ok(())
710 } else {
711 Err(SynchronizerError::CloseError("Not Started".to_string()))
712 }
713 }
714 }
715
716 #[test(tokio::test)]
717 async fn test_two_ready_synchronizers() {
718 let v2_sync = MockStateSync::new();
719 let v3_sync = MockStateSync::new();
720 let block_sync = BlockSynchronizer::new(
721 std::time::Duration::from_millis(500),
722 std::time::Duration::from_millis(50),
723 10,
724 )
725 .register_synchronizer(
726 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
727 v2_sync.clone(),
728 )
729 .register_synchronizer(
730 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
731 v3_sync.clone(),
732 );
733 let start_msg = StateSyncMessage {
734 header: BlockHeader { number: 1, ..Default::default() },
735 ..Default::default()
736 };
737 v2_sync
738 .send_header(start_msg.clone())
739 .await;
740 v3_sync
741 .send_header(start_msg.clone())
742 .await;
743
744 let (_jh, mut rx) = block_sync
745 .run()
746 .await
747 .expect("BlockSynchronizer failed to start.");
748 let first_feed_msg = rx
749 .recv()
750 .await
751 .expect("header channel was closed");
752 let second_msg = StateSyncMessage {
753 header: BlockHeader { number: 2, ..Default::default() },
754 ..Default::default()
755 };
756 v2_sync
757 .send_header(second_msg.clone())
758 .await;
759 v3_sync
760 .send_header(second_msg.clone())
761 .await;
762 let second_feed_msg = rx
763 .recv()
764 .await
765 .expect("header channel was closed!");
766
767 let exp1 = FeedMessage {
768 state_msgs: [
769 ("uniswap-v2".to_string(), start_msg.clone()),
770 ("uniswap-v3".to_string(), start_msg.clone()),
771 ]
772 .into_iter()
773 .collect(),
774 sync_states: [
775 ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
776 ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
777 ]
778 .into_iter()
779 .collect(),
780 };
781 let exp2 = FeedMessage {
782 state_msgs: [
783 ("uniswap-v2".to_string(), second_msg.clone()),
784 ("uniswap-v3".to_string(), second_msg.clone()),
785 ]
786 .into_iter()
787 .collect(),
788 sync_states: [
789 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
790 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
791 ]
792 .into_iter()
793 .collect(),
794 };
795 assert_eq!(first_feed_msg, exp1);
796 assert_eq!(second_feed_msg, exp2);
797 }
798
799 #[test(tokio::test)]
800 async fn test_delayed_synchronizer_catches_up() {
801 let v2_sync = MockStateSync::new();
802 let v3_sync = MockStateSync::new();
803 let block_sync = BlockSynchronizer::new(
804 std::time::Duration::from_millis(500),
805 std::time::Duration::from_millis(50),
806 10,
807 )
808 .register_synchronizer(
809 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
810 v2_sync.clone(),
811 )
812 .register_synchronizer(
813 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
814 v3_sync.clone(),
815 );
816
817 let block1_msg = StateSyncMessage {
819 header: BlockHeader {
820 number: 1,
821 hash: Bytes::from(vec![1]),
822 parent_hash: Bytes::from(vec![0]),
823 revert: false,
824 ..Default::default()
825 },
826 ..Default::default()
827 };
828 v2_sync
829 .send_header(block1_msg.clone())
830 .await;
831 v3_sync
832 .send_header(block1_msg.clone())
833 .await;
834
835 let (_jh, mut rx) = block_sync
837 .run()
838 .await
839 .expect("BlockSynchronizer failed to start.");
840
841 let first_feed_msg = rx
843 .recv()
844 .await
845 .expect("header channel was closed");
846 assert_eq!(first_feed_msg.state_msgs.len(), 2);
847 assert!(matches!(
848 first_feed_msg
849 .sync_states
850 .get("uniswap-v2")
851 .unwrap(),
852 SynchronizerState::Ready(_)
853 ));
854 assert!(matches!(
855 first_feed_msg
856 .sync_states
857 .get("uniswap-v3")
858 .unwrap(),
859 SynchronizerState::Ready(_)
860 ));
861
862 let block2_msg = StateSyncMessage {
864 header: BlockHeader {
865 number: 2,
866 hash: Bytes::from(vec![2]),
867 parent_hash: Bytes::from(vec![1]),
868 revert: false,
869 ..Default::default()
870 },
871 ..Default::default()
872 };
873 v2_sync
874 .send_header(block2_msg.clone())
875 .await;
876
877 let second_feed_msg = rx
879 .recv()
880 .await
881 .expect("header channel was closed");
882 assert!(second_feed_msg
883 .state_msgs
884 .contains_key("uniswap-v2"));
885 assert!(matches!(
886 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
887 SynchronizerState::Ready(header) if header.number == 2
888 ));
889 assert!(!second_feed_msg
890 .state_msgs
891 .contains_key("uniswap-v3"));
892 assert!(matches!(
893 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
894 SynchronizerState::Delayed(header) if header.number == 1
895 ));
896
897 v3_sync
899 .send_header(block2_msg.clone())
900 .await;
901
902 let block3_msg = StateSyncMessage {
904 header: BlockHeader {
905 number: 3,
906 hash: Bytes::from(vec![3]),
907 parent_hash: Bytes::from(vec![2]),
908 revert: false,
909 ..Default::default()
910 },
911 ..Default::default()
912 };
913 v2_sync
914 .send_header(block3_msg.clone())
915 .await;
916 v3_sync.send_header(block3_msg).await;
917
918 let third_feed_msg = rx
920 .recv()
921 .await
922 .expect("header channel was closed");
923 assert!(third_feed_msg
924 .state_msgs
925 .contains_key("uniswap-v2"));
926 assert!(third_feed_msg
927 .state_msgs
928 .contains_key("uniswap-v3"));
929 assert!(matches!(
930 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
931 SynchronizerState::Ready(header) if header.number == 3
932 ));
933 assert!(matches!(
934 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
935 SynchronizerState::Ready(header) if header.number == 3
936 ));
937 }
938
939 #[test(tokio::test)]
940 async fn test_different_start_blocks() {
941 let v2_sync = MockStateSync::new();
942 let v3_sync = MockStateSync::new();
943 let block_sync = BlockSynchronizer::new(
944 std::time::Duration::from_millis(500),
945 std::time::Duration::from_millis(50),
946 10,
947 )
948 .register_synchronizer(
949 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
950 v2_sync.clone(),
951 )
952 .register_synchronizer(
953 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
954 v3_sync.clone(),
955 );
956
957 let block1_msg = StateSyncMessage {
959 header: BlockHeader {
960 number: 1,
961 hash: Bytes::from(vec![1]),
962 parent_hash: Bytes::from(vec![0]),
963 revert: false,
964 ..Default::default()
965 },
966 ..Default::default()
967 };
968 let block2_msg = StateSyncMessage {
969 header: BlockHeader {
970 number: 2,
971 hash: Bytes::from(vec![2]),
972 parent_hash: Bytes::from(vec![1]),
973 revert: false,
974 ..Default::default()
975 },
976 ..Default::default()
977 };
978
979 v2_sync
980 .send_header(block1_msg.clone())
981 .await;
982 v3_sync
983 .send_header(block2_msg.clone())
984 .await;
985
986 let (_jh, mut rx) = block_sync
988 .run()
989 .await
990 .expect("BlockSynchronizer failed to start.");
991
992 let first_feed_msg = rx
994 .recv()
995 .await
996 .expect("header channel was closed");
997 assert!(matches!(
998 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
999 SynchronizerState::Delayed(header) if header.number == 1
1000 ));
1001 assert!(matches!(
1002 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1003 SynchronizerState::Ready(header) if header.number == 2
1004 ));
1005
1006 v2_sync
1008 .send_header(block2_msg.clone())
1009 .await;
1010
1011 let block3_msg = StateSyncMessage {
1013 header: BlockHeader {
1014 number: 3,
1015 hash: Bytes::from(vec![3]),
1016 parent_hash: Bytes::from(vec![2]),
1017 revert: false,
1018 ..Default::default()
1019 },
1020 ..Default::default()
1021 };
1022 v2_sync
1023 .send_header(block3_msg.clone())
1024 .await;
1025 v3_sync
1026 .send_header(block3_msg.clone())
1027 .await;
1028
1029 let second_feed_msg = rx
1031 .recv()
1032 .await
1033 .expect("header channel was closed");
1034 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1035 assert!(matches!(
1036 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1037 SynchronizerState::Ready(header) if header.number == 3
1038 ));
1039 assert!(matches!(
1040 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1041 SynchronizerState::Ready(header) if header.number == 3
1042 ));
1043 }
1044}