1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5 future::{join_all, try_join_all},
6 stream::FuturesUnordered,
7 StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use tokio::{
11 select,
12 sync::mpsc::{self, Receiver},
13 task::JoinHandle,
14 time::timeout,
15};
16use tracing::{debug, error, info, trace, warn};
17use tycho_common::{
18 dto::{Block, ExtractorIdentity},
19 Bytes,
20};
21
22use crate::feed::{
23 block_history::{BlockHistory, BlockPosition},
24 synchronizer::{StateSyncMessage, StateSynchronizer},
25};
26
27mod block_history;
28pub mod component_tracker;
29pub mod synchronizer;
30
31#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
32pub struct Header {
33 pub hash: Bytes,
34 pub number: u64,
35 pub parent_hash: Bytes,
36 pub revert: bool,
37}
38
39impl Header {
40 fn from_block(block: &Block, revert: bool) -> Self {
41 Self {
42 hash: block.hash.clone(),
43 number: block.number,
44 parent_hash: block.parent_hash.clone(),
45 revert,
46 }
47 }
48}
49
50type BlockSyncResult<T> = anyhow::Result<T>;
51
52pub struct BlockSynchronizer<S> {
99 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
100 block_time: std::time::Duration,
101 max_wait: std::time::Duration,
102 max_messages: Option<usize>,
103 max_missed_blocks: u64,
104}
105
106#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
107#[serde(tag = "status", rename_all = "lowercase")]
108pub enum SynchronizerState {
109 Started,
110 Ready(Header),
111 Stale(Header),
113 Delayed(Header),
114 Advanced(Header),
118 Ended,
119}
120
121pub struct SynchronizerStream {
122 extractor_id: ExtractorIdentity,
123 state: SynchronizerState,
124 modify_ts: NaiveDateTime,
125 rx: Receiver<StateSyncMessage>,
126}
127
128impl SynchronizerStream {
129 async fn try_advance(
130 &mut self,
131 block_history: &BlockHistory,
132 max_wait: std::time::Duration,
133 stale_threshold: std::time::Duration,
134 ) -> Option<StateSyncMessage> {
135 let extractor_id = &self.extractor_id;
136 let latest_block = block_history.latest();
137 match &self.state {
138 SynchronizerState::Started | SynchronizerState::Ended => {
139 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
140 None
141 }
142 SynchronizerState::Advanced(b) => {
143 let future_block = b.clone();
144 self.transition(future_block, block_history, stale_threshold);
146 None
147 }
148 SynchronizerState::Ready(previous_block) => {
149 self.try_recv_next_expected(
151 max_wait,
152 block_history,
153 previous_block.clone(),
154 stale_threshold,
155 )
156 .await
157 }
159 SynchronizerState::Delayed(old_block) => {
160 debug!(
162 ?old_block,
163 ?latest_block,
164 %extractor_id,
165 "Trying to catch up to latest block"
166 );
167 self.try_catch_up(block_history, max_wait, stale_threshold)
168 .await
169 }
170 SynchronizerState::Stale(old_block) => {
171 debug!(
172 ?old_block,
173 ?latest_block,
174 %extractor_id,
175 "Trying to catch up to latest block"
176 );
177 self.try_catch_up(block_history, max_wait, stale_threshold)
178 .await
179 }
180 }
181 }
182
183 async fn try_recv_next_expected(
188 &mut self,
189 max_wait: std::time::Duration,
190 block_history: &BlockHistory,
191 previous_block: Header,
192 stale_threshold: std::time::Duration,
193 ) -> Option<StateSyncMessage> {
194 let extractor_id = &self.extractor_id;
195 match timeout(max_wait, self.rx.recv()).await {
196 Ok(Some(msg)) => {
197 self.transition(msg.header.clone(), block_history, stale_threshold);
198 Some(msg)
199 }
200 Ok(None) => {
201 error!(
202 %extractor_id,
203 ?previous_block,
204 "Extractor terminated: channel closed!"
205 );
206 self.state = SynchronizerState::Ended;
207 self.modify_ts = Local::now().naive_utc();
208 None
209 }
210 Err(_) => {
211 debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
213 self.state = SynchronizerState::Delayed(previous_block.clone());
214 None
215 }
216 }
217 }
218
219 async fn try_catch_up(
225 &mut self,
226 block_history: &BlockHistory,
227 max_wait: std::time::Duration,
228 stale_threshold: std::time::Duration,
229 ) -> Option<StateSyncMessage> {
230 let mut results = Vec::new();
231 let extractor_id = &self.extractor_id;
232
233 let deadline = std::time::Instant::now() + max_wait;
235
236 while std::time::Instant::now() < deadline {
237 match timeout(
238 deadline.saturating_duration_since(std::time::Instant::now()),
239 self.rx.recv(),
240 )
241 .await
242 {
243 Ok(Some(msg)) => {
244 debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
245 let block_pos = block_history
246 .determine_block_position(&msg.header)
247 .unwrap();
248 results.push(msg);
249 if matches!(block_pos, BlockPosition::NextExpected) {
250 break;
251 }
252 }
253 Ok(None) => {
254 warn!(%extractor_id, "Channel closed during catch-up");
255 self.state = SynchronizerState::Ended;
256 return None;
257 }
258 Err(_) => {
259 debug!(%extractor_id, "Timed out waiting for catch-up");
260 break;
261 }
262 }
263 }
264
265 let merged = results
266 .into_iter()
267 .reduce(|l, r| l.merge(r));
268
269 if let Some(msg) = merged {
270 debug!(?extractor_id, "Delayed extractor made progress!");
272 self.transition(msg.header.clone(), block_history, stale_threshold);
273 Some(msg)
274 } else {
275 None
276 }
277 }
278
279 fn transition(
286 &mut self,
287 latest_retrieved: Header,
288 block_history: &BlockHistory,
289 stale_threshold: std::time::Duration,
290 ) {
291 let extractor_id = &self.extractor_id;
292 let last_message_at = self.modify_ts;
293 let block = &latest_retrieved;
294
295 match block_history
296 .determine_block_position(&latest_retrieved)
297 .expect("Block positiion could not be determined.")
298 {
299 BlockPosition::NextExpected => {
300 self.state = SynchronizerState::Ready(latest_retrieved.clone());
301 }
302 BlockPosition::Latest | BlockPosition::Delayed => {
303 let now = Local::now().naive_utc();
304 let wait_duration = self
305 .modify_ts
306 .signed_duration_since(now);
307 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
308 .expect("Staleness threshold could not convert to chrono Duration");
309 if wait_duration > stale_threshold_chrono {
310 warn!(
311 ?extractor_id,
312 ?last_message_at,
313 ?block,
314 "Extractor transition to stale."
315 );
316 self.state = SynchronizerState::Stale(latest_retrieved.clone());
317 } else {
318 warn!(
319 ?extractor_id,
320 ?last_message_at,
321 ?block,
322 "Extractor transition to delayed."
323 );
324 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
325 }
326 }
327 BlockPosition::Advanced => {
328 error!(
329 ?extractor_id,
330 ?last_message_at,
331 ?block,
332 "Extractor transition to advanced."
333 );
334 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
335 }
336 }
337 self.modify_ts = Local::now().naive_utc();
338 }
339}
340
341#[derive(Debug, PartialEq, Serialize, Deserialize)]
342pub struct FeedMessage {
343 pub state_msgs: HashMap<String, StateSyncMessage>,
344 pub sync_states: HashMap<String, SynchronizerState>,
345}
346
347impl FeedMessage {
348 fn new(
349 state_msgs: HashMap<String, StateSyncMessage>,
350 sync_states: HashMap<String, SynchronizerState>,
351 ) -> Self {
352 Self { state_msgs, sync_states }
353 }
354}
355
356impl<S> BlockSynchronizer<S>
357where
358 S: StateSynchronizer,
359{
360 pub fn new(
361 block_time: std::time::Duration,
362 max_wait: std::time::Duration,
363 max_missed_blocks: u64,
364 ) -> Self {
365 Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
366 }
367
368 pub fn max_messages(&mut self, val: usize) {
369 self.max_messages = Some(val);
370 }
371
372 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
373 let mut registered = self.synchronizers.unwrap_or_default();
374 registered.insert(id, synchronizer);
375 self.synchronizers = Some(registered);
376 self
377 }
378 pub async fn run(mut self) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage>)> {
379 trace!("Starting BlockSynchronizer...");
380 let mut state_sync_tasks = FuturesUnordered::new();
381 let mut synchronizers = self
382 .synchronizers
383 .take()
384 .expect("No synchronizers set!");
385 let init_tasks = synchronizers
387 .values()
388 .map(|s| s.initialize())
389 .collect::<Vec<_>>();
390 try_join_all(init_tasks).await?;
391
392 let mut sync_streams = HashMap::with_capacity(synchronizers.len());
393 let mut sync_handles = Vec::new();
394 for (extractor_id, synchronizer) in synchronizers.drain() {
395 let (jh, rx) = synchronizer.start().await?;
396 state_sync_tasks.push(jh);
397 sync_handles.push(synchronizer);
398
399 sync_streams.insert(
400 extractor_id.clone(),
401 SynchronizerStream {
402 extractor_id,
403 state: SynchronizerState::Started,
404 modify_ts: Local::now().naive_utc(),
405 rx,
406 },
407 );
408 }
409
410 debug!("Waiting for initial synchronizer messages...");
413 let mut startup_futures = Vec::new();
414 for (id, sh) in sync_streams.iter_mut() {
415 let fut = async {
416 let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
417 (id.clone(), res)
418 };
419 startup_futures.push(fut);
420 }
421 let mut ready_sync_msgs = HashMap::new();
422 let initial_headers = join_all(startup_futures)
423 .await
424 .into_iter()
425 .filter_map(|(extractor_id, res)| {
426 let synchronizer = sync_streams
427 .get_mut(&extractor_id)
428 .unwrap();
429 match res {
430 Ok(Some(msg)) => {
431 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
432 synchronizer.state = SynchronizerState::Ready(msg.header.clone());
434 synchronizer.modify_ts = Local::now().naive_utc();
435 ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
436 Some(msg.header)
437 }
438 Ok(None) => {
439 warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
440 synchronizer.state = SynchronizerState::Ended;
441 synchronizer.modify_ts = Local::now().naive_utc();
442 None
443 }
444 Err(_) => {
445 warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
446 synchronizer.state = SynchronizerState::Stale(Header::default());
447 synchronizer.modify_ts = Local::now().naive_utc();
448 None
449 }
450 }
451 })
452 .collect::<HashSet<_>>() .into_iter()
454 .collect::<Vec<_>>();
455
456 let mut block_history = BlockHistory::new(initial_headers, 15);
457
458 let start_header = block_history
460 .latest()
461 .ok_or_else(|| anyhow::anyhow!("No synchronizers were ready for the operation"))?;
462 info!(
463 start_block=?start_header,
464 n_healthy=?ready_sync_msgs.len(),
465 "Block synchronization started successfully!"
466 );
467
468 sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
473
474 for (_, stream) in sync_streams.iter_mut() {
477 if let SynchronizerState::Ready(header) = &stream.state.clone() {
478 if header.number < start_header.number {
479 stream.state = SynchronizerState::Delayed(header.clone());
480 debug!(
481 extractor_id=%stream.extractor_id,
482 synchronizer_block=?header.number,
483 current_block=?start_header.number,
484 "Marking synchronizer as delayed during initialization"
485 );
486 }
487 }
488 }
489
490 let (sync_tx, sync_rx) = mpsc::channel(30);
491 let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
492 let mut n_iter = 1;
493 loop {
494 sync_tx
496 .send(FeedMessage::new(
497 std::mem::take(&mut ready_sync_msgs),
498 sync_streams
499 .iter()
500 .map(|(a, b)| (a.name.to_string(), b.state.clone()))
501 .collect(),
502 ))
503 .await?;
504
505 if let Some(max_messages) = self.max_messages {
507 if n_iter >= max_messages {
508 info!(max_messages, "StreamEnd");
509 return Ok(());
510 }
511 }
512 n_iter += 1;
513
514 let mut recv_futures = Vec::new();
524 for (extractor_id, sh) in sync_streams.iter_mut() {
525 recv_futures.push(async {
526 let res = sh
527 .try_advance(
528 &block_history,
529 self.block_time + self.max_wait,
530 self.block_time
531 .mul_f64(self.max_missed_blocks as f64),
532 )
533 .await;
534 res.map(|msg| (extractor_id.name.clone(), msg))
535 });
536 }
537 ready_sync_msgs.extend(
538 join_all(recv_futures)
539 .await
540 .into_iter()
541 .flatten(),
542 );
543
544 sync_streams.retain(|_, v| match v.state {
547 SynchronizerState::Started | SynchronizerState::Ended => false,
548 SynchronizerState::Stale(_) => false,
549 SynchronizerState::Ready(_) => true,
550 SynchronizerState::Delayed(_) => true,
551 SynchronizerState::Advanced(_) => true,
552 });
553
554 block_history
555 .push(
556 sync_streams
557 .values()
558 .filter_map(|v| match &v.state {
559 SynchronizerState::Ready(b) => Some(b),
560 _ => None,
561 })
562 .next()
563 .ok_or_else(|| {
565 anyhow::format_err!("Not a single synchronizer was ready.")
566 })?
567 .clone(),
568 )
569 .map_err(|err| anyhow::format_err!("Failed processing new block: {}", err))?;
570 }
571 });
572
573 let nanny_jh = tokio::spawn(async move {
574 select! {
575 error = state_sync_tasks.select_next_some() => {
576 for s in sync_handles.iter_mut() {
577 s.close().await.expect("StateSynchronizer was not started!");
578 }
579 error!(?error, "state synchronizer exited");
580 },
581 error = main_loop_jh => {
582 error!(?error, "feed main loop exited");
583 }
584 }
585 });
586 Ok((nanny_jh, sync_rx))
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use std::sync::Arc;
593
594 use async_trait::async_trait;
595 use test_log::test;
596 use tokio::sync::{oneshot, Mutex};
597 use tycho_common::dto::Chain;
598
599 use super::*;
600 use crate::feed::synchronizer::SyncResult;
601
602 #[derive(Clone)]
603 struct MockStateSync {
604 header_tx: mpsc::Sender<StateSyncMessage>,
605 header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage>>>>,
606 end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
607 }
608
609 impl MockStateSync {
610 fn new() -> Self {
611 let (tx, rx) = mpsc::channel(1);
612 Self {
613 header_tx: tx,
614 header_rx: Arc::new(Mutex::new(Some(rx))),
615 end_tx: Arc::new(Mutex::new(None)),
616 }
617 }
618 async fn send_header(&self, header: StateSyncMessage) {
619 self.header_tx
620 .send(header)
621 .await
622 .expect("sending header failed");
623 }
624 }
625
626 #[async_trait]
627 impl StateSynchronizer for MockStateSync {
628 async fn initialize(&self) -> SyncResult<()> {
629 Ok(())
630 }
631
632 async fn start(
633 &self,
634 ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage>)> {
635 let block_rx = {
636 let mut guard = self.header_rx.lock().await;
637 guard
638 .take()
639 .expect("Block receiver was not set!")
640 };
641
642 let end_rx = {
643 let (end_tx, end_rx) = oneshot::channel();
644 let mut guard = self.end_tx.lock().await;
645 *guard = Some(end_tx);
646 end_rx
647 };
648
649 let jh = tokio::spawn(async move {
650 let _ = end_rx.await;
651 SyncResult::Ok(())
652 });
653
654 Ok((jh, block_rx))
655 }
656
657 async fn close(&mut self) -> SyncResult<()> {
658 let mut guard = self.end_tx.lock().await;
659 if let Some(tx) = guard.take() {
660 tx.send(())
661 .expect("end channel closed!");
662 Ok(())
663 } else {
664 Err(anyhow::format_err!("Not connected"))
665 }
666 }
667 }
668
669 #[test(tokio::test)]
670 async fn test_two_ready_synchronizers() {
671 let v2_sync = MockStateSync::new();
672 let v3_sync = MockStateSync::new();
673 let block_sync = BlockSynchronizer::new(
674 std::time::Duration::from_millis(500),
675 std::time::Duration::from_millis(50),
676 10,
677 )
678 .register_synchronizer(
679 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
680 v2_sync.clone(),
681 )
682 .register_synchronizer(
683 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
684 v3_sync.clone(),
685 );
686 let start_msg = StateSyncMessage {
687 header: Header { number: 1, ..Default::default() },
688 ..Default::default()
689 };
690 v2_sync
691 .send_header(start_msg.clone())
692 .await;
693 v3_sync
694 .send_header(start_msg.clone())
695 .await;
696
697 let (_jh, mut rx) = block_sync
698 .run()
699 .await
700 .expect("BlockSynchronizer failed to start.");
701 let first_feed_msg = rx
702 .recv()
703 .await
704 .expect("header channel was closed");
705 let second_msg = StateSyncMessage {
706 header: Header { number: 2, ..Default::default() },
707 ..Default::default()
708 };
709 v2_sync
710 .send_header(second_msg.clone())
711 .await;
712 v3_sync
713 .send_header(second_msg.clone())
714 .await;
715 let second_feed_msg = rx
716 .recv()
717 .await
718 .expect("header channel was closed!");
719
720 let exp1 = FeedMessage {
721 state_msgs: [
722 ("uniswap-v2".to_string(), start_msg.clone()),
723 ("uniswap-v3".to_string(), start_msg.clone()),
724 ]
725 .into_iter()
726 .collect(),
727 sync_states: [
728 ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
729 ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
730 ]
731 .into_iter()
732 .collect(),
733 };
734 let exp2 = FeedMessage {
735 state_msgs: [
736 ("uniswap-v2".to_string(), second_msg.clone()),
737 ("uniswap-v3".to_string(), second_msg.clone()),
738 ]
739 .into_iter()
740 .collect(),
741 sync_states: [
742 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
743 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
744 ]
745 .into_iter()
746 .collect(),
747 };
748 assert_eq!(first_feed_msg, exp1);
749 assert_eq!(second_feed_msg, exp2);
750 }
751
752 #[test(tokio::test)]
753 async fn test_delayed_synchronizer_catches_up() {
754 let v2_sync = MockStateSync::new();
755 let v3_sync = MockStateSync::new();
756 let block_sync = BlockSynchronizer::new(
757 std::time::Duration::from_millis(500),
758 std::time::Duration::from_millis(50),
759 10,
760 )
761 .register_synchronizer(
762 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
763 v2_sync.clone(),
764 )
765 .register_synchronizer(
766 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
767 v3_sync.clone(),
768 );
769
770 let block1_msg = StateSyncMessage {
772 header: Header {
773 number: 1,
774 hash: Bytes::from(vec![1]),
775 parent_hash: Bytes::from(vec![0]),
776 revert: false,
777 },
778 ..Default::default()
779 };
780 v2_sync
781 .send_header(block1_msg.clone())
782 .await;
783 v3_sync
784 .send_header(block1_msg.clone())
785 .await;
786
787 let (_jh, mut rx) = block_sync
789 .run()
790 .await
791 .expect("BlockSynchronizer failed to start.");
792
793 let first_feed_msg = rx
795 .recv()
796 .await
797 .expect("header channel was closed");
798 assert_eq!(first_feed_msg.state_msgs.len(), 2);
799 assert!(matches!(
800 first_feed_msg
801 .sync_states
802 .get("uniswap-v2")
803 .unwrap(),
804 SynchronizerState::Ready(_)
805 ));
806 assert!(matches!(
807 first_feed_msg
808 .sync_states
809 .get("uniswap-v3")
810 .unwrap(),
811 SynchronizerState::Ready(_)
812 ));
813
814 let block2_msg = StateSyncMessage {
816 header: Header {
817 number: 2,
818 hash: Bytes::from(vec![2]),
819 parent_hash: Bytes::from(vec![1]),
820 revert: false,
821 },
822 ..Default::default()
823 };
824 v2_sync
825 .send_header(block2_msg.clone())
826 .await;
827
828 let second_feed_msg = rx
830 .recv()
831 .await
832 .expect("header channel was closed");
833 assert!(second_feed_msg
834 .state_msgs
835 .contains_key("uniswap-v2"));
836 assert!(matches!(
837 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
838 SynchronizerState::Ready(header) if header.number == 2
839 ));
840 assert!(!second_feed_msg
841 .state_msgs
842 .contains_key("uniswap-v3"));
843 assert!(matches!(
844 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
845 SynchronizerState::Delayed(header) if header.number == 1
846 ));
847
848 v3_sync
850 .send_header(block2_msg.clone())
851 .await;
852
853 let block3_msg = StateSyncMessage {
855 header: Header {
856 number: 3,
857 hash: Bytes::from(vec![3]),
858 parent_hash: Bytes::from(vec![2]),
859 revert: false,
860 },
861 ..Default::default()
862 };
863 v2_sync
864 .send_header(block3_msg.clone())
865 .await;
866 v3_sync.send_header(block3_msg).await;
867
868 let third_feed_msg = rx
870 .recv()
871 .await
872 .expect("header channel was closed");
873 assert!(third_feed_msg
874 .state_msgs
875 .contains_key("uniswap-v2"));
876 assert!(third_feed_msg
877 .state_msgs
878 .contains_key("uniswap-v3"));
879 assert!(matches!(
880 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
881 SynchronizerState::Ready(header) if header.number == 3
882 ));
883 assert!(matches!(
884 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
885 SynchronizerState::Ready(header) if header.number == 3
886 ));
887 }
888
889 #[test(tokio::test)]
890 async fn test_different_start_blocks() {
891 let v2_sync = MockStateSync::new();
892 let v3_sync = MockStateSync::new();
893 let block_sync = BlockSynchronizer::new(
894 std::time::Duration::from_millis(500),
895 std::time::Duration::from_millis(50),
896 10,
897 )
898 .register_synchronizer(
899 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
900 v2_sync.clone(),
901 )
902 .register_synchronizer(
903 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
904 v3_sync.clone(),
905 );
906
907 let block1_msg = StateSyncMessage {
909 header: Header {
910 number: 1,
911 hash: Bytes::from(vec![1]),
912 parent_hash: Bytes::from(vec![0]),
913 revert: false,
914 },
915 ..Default::default()
916 };
917 let block2_msg = StateSyncMessage {
918 header: Header {
919 number: 2,
920 hash: Bytes::from(vec![2]),
921 parent_hash: Bytes::from(vec![1]),
922 revert: false,
923 },
924 ..Default::default()
925 };
926
927 v2_sync
928 .send_header(block1_msg.clone())
929 .await;
930 v3_sync
931 .send_header(block2_msg.clone())
932 .await;
933
934 let (_jh, mut rx) = block_sync
936 .run()
937 .await
938 .expect("BlockSynchronizer failed to start.");
939
940 let first_feed_msg = rx
942 .recv()
943 .await
944 .expect("header channel was closed");
945 assert!(matches!(
946 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
947 SynchronizerState::Delayed(header) if header.number == 1
948 ));
949 assert!(matches!(
950 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
951 SynchronizerState::Ready(header) if header.number == 2
952 ));
953
954 v2_sync
956 .send_header(block2_msg.clone())
957 .await;
958
959 let block3_msg = StateSyncMessage {
961 header: Header {
962 number: 3,
963 hash: Bytes::from(vec![3]),
964 parent_hash: Bytes::from(vec![2]),
965 revert: false,
966 },
967 ..Default::default()
968 };
969 v2_sync
970 .send_header(block3_msg.clone())
971 .await;
972 v3_sync
973 .send_header(block3_msg.clone())
974 .await;
975
976 let second_feed_msg = rx
978 .recv()
979 .await
980 .expect("header channel was closed");
981 assert_eq!(second_feed_msg.state_msgs.len(), 2);
982 assert!(matches!(
983 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
984 SynchronizerState::Ready(header) if header.number == 3
985 ));
986 assert!(matches!(
987 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
988 SynchronizerState::Ready(header) if header.number == 3
989 ));
990 }
991}