1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5 future::{join_all, try_join_all},
6 stream::FuturesUnordered,
7 StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12 select,
13 sync::mpsc::{self, Receiver},
14 task::JoinHandle,
15 time::timeout,
16};
17use tracing::{debug, error, info, trace, warn};
18use tycho_common::{
19 dto::{Block, ExtractorIdentity},
20 Bytes,
21};
22
23use crate::feed::{
24 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
25 synchronizer::{StateSyncMessage, StateSynchronizer, SynchronizerError},
26};
27
28mod block_history;
29pub mod component_tracker;
30pub mod synchronizer;
31
32#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
33pub struct Header {
34 pub hash: Bytes,
35 pub number: u64,
36 pub parent_hash: Bytes,
37 pub revert: bool,
38}
39
40impl Header {
41 fn from_block(block: &Block, revert: bool) -> Self {
42 Self {
43 hash: block.hash.clone(),
44 number: block.number,
45 parent_hash: block.parent_hash.clone(),
46 revert,
47 }
48 }
49}
50
51#[derive(Error, Debug)]
52pub enum BlockSynchronizerError {
53 #[error("Failed to initialize synchronizer: {0}")]
54 InitializationError(#[from] SynchronizerError),
55
56 #[error("Failed to process new block: {0}")]
57 BlockHistoryError(#[from] BlockHistoryError),
58
59 #[error("Not a single synchronizer was ready")]
60 NoReadySynchronizers,
61
62 #[error("No synchronizers were set")]
63 NoSynchronizers,
64
65 #[error("Failed to convert duration: {0}")]
66 DurationConversionError(String),
67}
68
69type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
70
71pub struct BlockSynchronizer<S> {
118 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
119 block_time: std::time::Duration,
120 max_wait: std::time::Duration,
121 max_messages: Option<usize>,
122 max_missed_blocks: u64,
123}
124
125#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
126#[serde(tag = "status", rename_all = "lowercase")]
127pub enum SynchronizerState {
128 Started,
129 Ready(Header),
130 Stale(Header),
132 Delayed(Header),
133 Advanced(Header),
137 Ended,
138}
139
140pub struct SynchronizerStream {
141 extractor_id: ExtractorIdentity,
142 state: SynchronizerState,
143 modify_ts: NaiveDateTime,
144 rx: Receiver<StateSyncMessage>,
145}
146
147impl SynchronizerStream {
148 async fn try_advance(
149 &mut self,
150 block_history: &BlockHistory,
151 max_wait: std::time::Duration,
152 stale_threshold: std::time::Duration,
153 ) -> BlockSyncResult<Option<StateSyncMessage>> {
154 let extractor_id = self.extractor_id.clone();
155 let latest_block = block_history.latest();
156 match &self.state {
157 SynchronizerState::Started | SynchronizerState::Ended => {
158 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
159 Ok(None)
160 }
161 SynchronizerState::Advanced(b) => {
162 let future_block = b.clone();
163 self.transition(future_block, block_history, stale_threshold)?;
165 Ok(None)
166 }
167 SynchronizerState::Ready(previous_block) => {
168 self.try_recv_next_expected(
170 max_wait,
171 block_history,
172 previous_block.clone(),
173 stale_threshold,
174 )
175 .await
176 }
178 SynchronizerState::Delayed(old_block) => {
179 debug!(
181 ?old_block,
182 ?latest_block,
183 %extractor_id,
184 "Trying to catch up to latest block"
185 );
186 self.try_catch_up(block_history, max_wait, stale_threshold)
187 .await
188 }
189 SynchronizerState::Stale(old_block) => {
190 debug!(
191 ?old_block,
192 ?latest_block,
193 %extractor_id,
194 "Trying to catch up to latest block"
195 );
196 self.try_catch_up(block_history, max_wait, stale_threshold)
197 .await
198 }
199 }
200 }
201
202 async fn try_recv_next_expected(
207 &mut self,
208 max_wait: std::time::Duration,
209 block_history: &BlockHistory,
210 previous_block: Header,
211 stale_threshold: std::time::Duration,
212 ) -> BlockSyncResult<Option<StateSyncMessage>> {
213 let extractor_id = self.extractor_id.clone();
214 match timeout(max_wait, self.rx.recv()).await {
215 Ok(Some(msg)) => {
216 self.transition(msg.header.clone(), block_history, stale_threshold)?;
217 Ok(Some(msg))
218 }
219 Ok(None) => {
220 error!(
221 %extractor_id,
222 ?previous_block,
223 "Extractor terminated: channel closed!"
224 );
225 self.state = SynchronizerState::Ended;
226 self.modify_ts = Local::now().naive_utc();
227 Ok(None)
228 }
229 Err(_) => {
230 debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
232 self.state = SynchronizerState::Delayed(previous_block.clone());
233 Ok(None)
234 }
235 }
236 }
237
238 async fn try_catch_up(
244 &mut self,
245 block_history: &BlockHistory,
246 max_wait: std::time::Duration,
247 stale_threshold: std::time::Duration,
248 ) -> BlockSyncResult<Option<StateSyncMessage>> {
249 let mut results = Vec::new();
250 let extractor_id = self.extractor_id.clone();
251
252 let deadline = std::time::Instant::now() + max_wait;
254
255 while std::time::Instant::now() < deadline {
256 match timeout(
257 deadline.saturating_duration_since(std::time::Instant::now()),
258 self.rx.recv(),
259 )
260 .await
261 {
262 Ok(Some(msg)) => {
263 debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
264 let block_pos = block_history.determine_block_position(&msg.header)?;
265 results.push(msg);
266 if matches!(block_pos, BlockPosition::NextExpected) {
267 break;
268 }
269 }
270 Ok(None) => {
271 warn!(%extractor_id, "Channel closed during catch-up");
272 self.state = SynchronizerState::Ended;
273 return Ok(None);
274 }
275 Err(_) => {
276 debug!(%extractor_id, "Timed out waiting for catch-up");
277 break;
278 }
279 }
280 }
281
282 let merged = results
283 .into_iter()
284 .reduce(|l, r| l.merge(r));
285
286 if let Some(msg) = merged {
287 debug!(?extractor_id, "Delayed extractor made progress!");
289 self.transition(msg.header.clone(), block_history, stale_threshold)?;
290 Ok(Some(msg))
291 } else {
292 Ok(None)
293 }
294 }
295
296 fn transition(
303 &mut self,
304 latest_retrieved: Header,
305 block_history: &BlockHistory,
306 stale_threshold: std::time::Duration,
307 ) -> Result<(), BlockSynchronizerError> {
308 let extractor_id = self.extractor_id.clone();
309 let last_message_at = self.modify_ts;
310 let block = &latest_retrieved;
311
312 match block_history.determine_block_position(&latest_retrieved)? {
313 BlockPosition::NextExpected => {
314 self.state = SynchronizerState::Ready(latest_retrieved.clone());
315 }
316 BlockPosition::Latest | BlockPosition::Delayed => {
317 let now = Local::now().naive_utc();
318 let wait_duration = self
319 .modify_ts
320 .signed_duration_since(now);
321 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
322 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
323 if wait_duration > stale_threshold_chrono {
324 warn!(
325 ?extractor_id,
326 ?last_message_at,
327 ?block,
328 "Extractor transition to stale."
329 );
330 self.state = SynchronizerState::Stale(latest_retrieved.clone());
331 } else {
332 warn!(
333 ?extractor_id,
334 ?last_message_at,
335 ?block,
336 "Extractor transition to delayed."
337 );
338 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
339 }
340 }
341 BlockPosition::Advanced => {
342 error!(
343 ?extractor_id,
344 ?last_message_at,
345 ?block,
346 "Extractor transition to advanced."
347 );
348 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
349 }
350 }
351 self.modify_ts = Local::now().naive_utc();
352 Ok(())
353 }
354}
355
356#[derive(Debug, PartialEq, Serialize, Deserialize)]
357pub struct FeedMessage {
358 pub state_msgs: HashMap<String, StateSyncMessage>,
359 pub sync_states: HashMap<String, SynchronizerState>,
360}
361
362impl FeedMessage {
363 fn new(
364 state_msgs: HashMap<String, StateSyncMessage>,
365 sync_states: HashMap<String, SynchronizerState>,
366 ) -> Self {
367 Self { state_msgs, sync_states }
368 }
369}
370
371impl<S> BlockSynchronizer<S>
372where
373 S: StateSynchronizer,
374{
375 pub fn new(
376 block_time: std::time::Duration,
377 max_wait: std::time::Duration,
378 max_missed_blocks: u64,
379 ) -> Self {
380 Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
381 }
382
383 pub fn max_messages(&mut self, val: usize) {
384 self.max_messages = Some(val);
385 }
386
387 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
388 let mut registered = self.synchronizers.unwrap_or_default();
389 registered.insert(id, synchronizer);
390 self.synchronizers = Some(registered);
391 self
392 }
393 pub async fn run(mut self) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage>)> {
394 trace!("Starting BlockSynchronizer...");
395 let mut state_sync_tasks = FuturesUnordered::new();
396 let mut synchronizers = self
397 .synchronizers
398 .take()
399 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
400 let init_tasks = synchronizers
402 .values()
403 .map(|s| s.initialize())
404 .collect::<Vec<_>>();
405 try_join_all(init_tasks).await?;
406
407 let mut sync_streams = HashMap::with_capacity(synchronizers.len());
408 let mut sync_handles = Vec::new();
409 for (extractor_id, synchronizer) in synchronizers.drain() {
410 let (jh, rx) = synchronizer.start().await?;
411 state_sync_tasks.push(jh);
412 sync_handles.push(synchronizer);
413
414 sync_streams.insert(
415 extractor_id.clone(),
416 SynchronizerStream {
417 extractor_id,
418 state: SynchronizerState::Started,
419 modify_ts: Local::now().naive_utc(),
420 rx,
421 },
422 );
423 }
424
425 debug!("Waiting for initial synchronizer messages...");
428 let mut startup_futures = Vec::new();
429 for (id, sh) in sync_streams.iter_mut() {
430 let fut = async {
431 let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
432 (id.clone(), res)
433 };
434 startup_futures.push(fut);
435 }
436 let mut ready_sync_msgs = HashMap::new();
437 let initial_headers = join_all(startup_futures)
438 .await
439 .into_iter()
440 .filter_map(|(extractor_id, res)| {
441 let synchronizer = sync_streams
442 .get_mut(&extractor_id)
443 .unwrap();
444 match res {
445 Ok(Some(msg)) => {
446 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
447 synchronizer.state = SynchronizerState::Ready(msg.header.clone());
449 synchronizer.modify_ts = Local::now().naive_utc();
450 ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
451 Some(msg.header)
452 }
453 Ok(None) => {
454 warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
455 synchronizer.state = SynchronizerState::Ended;
456 synchronizer.modify_ts = Local::now().naive_utc();
457 None
458 }
459 Err(_) => {
460 warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
461 synchronizer.state = SynchronizerState::Stale(Header::default());
462 synchronizer.modify_ts = Local::now().naive_utc();
463 None
464 }
465 }
466 })
467 .collect::<HashSet<_>>() .into_iter()
469 .collect::<Vec<_>>();
470
471 let mut block_history = BlockHistory::new(initial_headers, 15)?;
472
473 let start_header = block_history
475 .latest()
476 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
477 info!(
478 start_block=?start_header,
479 n_healthy=?ready_sync_msgs.len(),
480 "Block synchronization started successfully!"
481 );
482
483 sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
488
489 for (_, stream) in sync_streams.iter_mut() {
492 if let SynchronizerState::Ready(header) = &stream.state.clone() {
493 if header.number < start_header.number {
494 stream.state = SynchronizerState::Delayed(header.clone());
495 debug!(
496 extractor_id=%stream.extractor_id,
497 synchronizer_block=?header.number,
498 current_block=?start_header.number,
499 "Marking synchronizer as delayed during initialization"
500 );
501 }
502 }
503 }
504
505 let (sync_tx, sync_rx) = mpsc::channel(30);
506 let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
507 let mut n_iter = 1;
508 loop {
509 sync_tx
511 .send(FeedMessage::new(
512 std::mem::take(&mut ready_sync_msgs),
513 sync_streams
514 .iter()
515 .map(|(a, b)| (a.name.to_string(), b.state.clone()))
516 .collect(),
517 ))
518 .await?;
519
520 if let Some(max_messages) = self.max_messages {
522 if n_iter >= max_messages {
523 info!(max_messages, "StreamEnd");
524 return Ok(());
525 }
526 }
527 n_iter += 1;
528
529 let mut recv_futures = Vec::new();
539 for (extractor_id, sh) in sync_streams.iter_mut() {
540 recv_futures.push(async {
541 let res = sh
542 .try_advance(
543 &block_history,
544 self.block_time + self.max_wait,
545 self.block_time
546 .mul_f64(self.max_missed_blocks as f64),
547 )
548 .await?;
549 Ok::<_, BlockSynchronizerError>(
550 res.map(|msg| (extractor_id.name.clone(), msg)),
551 )
552 });
553 }
554 ready_sync_msgs.extend(
555 join_all(recv_futures)
556 .await
557 .into_iter()
558 .collect::<Result<Vec<_>, _>>()?
559 .into_iter()
560 .flatten(),
561 );
562
563 sync_streams.retain(|_, v| match v.state {
566 SynchronizerState::Started | SynchronizerState::Ended => false,
567 SynchronizerState::Stale(_) => false,
568 SynchronizerState::Ready(_) => true,
569 SynchronizerState::Delayed(_) => true,
570 SynchronizerState::Advanced(_) => true,
571 });
572
573 block_history.push(
574 sync_streams
575 .values()
576 .filter_map(|v| match &v.state {
577 SynchronizerState::Ready(b) => Some(b),
578 _ => None,
579 })
580 .next()
581 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?
583 .clone(),
584 )?;
585 }
586 });
587
588 let nanny_jh = tokio::spawn(async move {
589 select! {
590 error = state_sync_tasks.select_next_some() => {
591 for s in sync_handles.iter_mut() {
592 if let Err(e) = s.close().await {
593 error!(error=?e, "Failed to close synchronizer: was not started!");
594 }
595 }
596 error!(?error, "State synchronizer exited");
597 },
598 error = main_loop_jh => {
599 error!(?error, "Feed main loop exited");
600 }
601 }
602 });
603 Ok((nanny_jh, sync_rx))
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use std::sync::Arc;
610
611 use async_trait::async_trait;
612 use test_log::test;
613 use tokio::sync::{oneshot, Mutex};
614 use tycho_common::dto::Chain;
615
616 use super::{synchronizer::SynchronizerError, *};
617 use crate::feed::synchronizer::SyncResult;
618
619 #[derive(Clone)]
620 struct MockStateSync {
621 header_tx: mpsc::Sender<StateSyncMessage>,
622 header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage>>>>,
623 end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
624 }
625
626 impl MockStateSync {
627 fn new() -> Self {
628 let (tx, rx) = mpsc::channel(1);
629 Self {
630 header_tx: tx,
631 header_rx: Arc::new(Mutex::new(Some(rx))),
632 end_tx: Arc::new(Mutex::new(None)),
633 }
634 }
635 async fn send_header(&self, header: StateSyncMessage) {
636 self.header_tx
637 .send(header)
638 .await
639 .expect("sending header failed");
640 }
641 }
642
643 #[async_trait]
644 impl StateSynchronizer for MockStateSync {
645 async fn initialize(&self) -> SyncResult<()> {
646 Ok(())
647 }
648
649 async fn start(
650 &self,
651 ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage>)> {
652 let block_rx = {
653 let mut guard = self.header_rx.lock().await;
654 guard
655 .take()
656 .expect("Block receiver was not set!")
657 };
658
659 let end_rx = {
660 let (end_tx, end_rx) = oneshot::channel();
661 let mut guard = self.end_tx.lock().await;
662 *guard = Some(end_tx);
663 end_rx
664 };
665
666 let jh = tokio::spawn(async move {
667 let _ = end_rx.await;
668 SyncResult::Ok(())
669 });
670
671 Ok((jh, block_rx))
672 }
673
674 async fn close(&mut self) -> SyncResult<()> {
675 let mut guard = self.end_tx.lock().await;
676 if let Some(tx) = guard.take() {
677 tx.send(())
678 .expect("end channel closed!");
679 Ok(())
680 } else {
681 Err(SynchronizerError::CloseError("Not Started".to_string()))
682 }
683 }
684 }
685
686 #[test(tokio::test)]
687 async fn test_two_ready_synchronizers() {
688 let v2_sync = MockStateSync::new();
689 let v3_sync = MockStateSync::new();
690 let block_sync = BlockSynchronizer::new(
691 std::time::Duration::from_millis(500),
692 std::time::Duration::from_millis(50),
693 10,
694 )
695 .register_synchronizer(
696 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
697 v2_sync.clone(),
698 )
699 .register_synchronizer(
700 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
701 v3_sync.clone(),
702 );
703 let start_msg = StateSyncMessage {
704 header: Header { number: 1, ..Default::default() },
705 ..Default::default()
706 };
707 v2_sync
708 .send_header(start_msg.clone())
709 .await;
710 v3_sync
711 .send_header(start_msg.clone())
712 .await;
713
714 let (_jh, mut rx) = block_sync
715 .run()
716 .await
717 .expect("BlockSynchronizer failed to start.");
718 let first_feed_msg = rx
719 .recv()
720 .await
721 .expect("header channel was closed");
722 let second_msg = StateSyncMessage {
723 header: Header { number: 2, ..Default::default() },
724 ..Default::default()
725 };
726 v2_sync
727 .send_header(second_msg.clone())
728 .await;
729 v3_sync
730 .send_header(second_msg.clone())
731 .await;
732 let second_feed_msg = rx
733 .recv()
734 .await
735 .expect("header channel was closed!");
736
737 let exp1 = FeedMessage {
738 state_msgs: [
739 ("uniswap-v2".to_string(), start_msg.clone()),
740 ("uniswap-v3".to_string(), start_msg.clone()),
741 ]
742 .into_iter()
743 .collect(),
744 sync_states: [
745 ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
746 ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
747 ]
748 .into_iter()
749 .collect(),
750 };
751 let exp2 = FeedMessage {
752 state_msgs: [
753 ("uniswap-v2".to_string(), second_msg.clone()),
754 ("uniswap-v3".to_string(), second_msg.clone()),
755 ]
756 .into_iter()
757 .collect(),
758 sync_states: [
759 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
760 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
761 ]
762 .into_iter()
763 .collect(),
764 };
765 assert_eq!(first_feed_msg, exp1);
766 assert_eq!(second_feed_msg, exp2);
767 }
768
769 #[test(tokio::test)]
770 async fn test_delayed_synchronizer_catches_up() {
771 let v2_sync = MockStateSync::new();
772 let v3_sync = MockStateSync::new();
773 let block_sync = BlockSynchronizer::new(
774 std::time::Duration::from_millis(500),
775 std::time::Duration::from_millis(50),
776 10,
777 )
778 .register_synchronizer(
779 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
780 v2_sync.clone(),
781 )
782 .register_synchronizer(
783 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
784 v3_sync.clone(),
785 );
786
787 let block1_msg = StateSyncMessage {
789 header: Header {
790 number: 1,
791 hash: Bytes::from(vec![1]),
792 parent_hash: Bytes::from(vec![0]),
793 revert: false,
794 },
795 ..Default::default()
796 };
797 v2_sync
798 .send_header(block1_msg.clone())
799 .await;
800 v3_sync
801 .send_header(block1_msg.clone())
802 .await;
803
804 let (_jh, mut rx) = block_sync
806 .run()
807 .await
808 .expect("BlockSynchronizer failed to start.");
809
810 let first_feed_msg = rx
812 .recv()
813 .await
814 .expect("header channel was closed");
815 assert_eq!(first_feed_msg.state_msgs.len(), 2);
816 assert!(matches!(
817 first_feed_msg
818 .sync_states
819 .get("uniswap-v2")
820 .unwrap(),
821 SynchronizerState::Ready(_)
822 ));
823 assert!(matches!(
824 first_feed_msg
825 .sync_states
826 .get("uniswap-v3")
827 .unwrap(),
828 SynchronizerState::Ready(_)
829 ));
830
831 let block2_msg = StateSyncMessage {
833 header: Header {
834 number: 2,
835 hash: Bytes::from(vec![2]),
836 parent_hash: Bytes::from(vec![1]),
837 revert: false,
838 },
839 ..Default::default()
840 };
841 v2_sync
842 .send_header(block2_msg.clone())
843 .await;
844
845 let second_feed_msg = rx
847 .recv()
848 .await
849 .expect("header channel was closed");
850 assert!(second_feed_msg
851 .state_msgs
852 .contains_key("uniswap-v2"));
853 assert!(matches!(
854 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
855 SynchronizerState::Ready(header) if header.number == 2
856 ));
857 assert!(!second_feed_msg
858 .state_msgs
859 .contains_key("uniswap-v3"));
860 assert!(matches!(
861 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
862 SynchronizerState::Delayed(header) if header.number == 1
863 ));
864
865 v3_sync
867 .send_header(block2_msg.clone())
868 .await;
869
870 let block3_msg = StateSyncMessage {
872 header: Header {
873 number: 3,
874 hash: Bytes::from(vec![3]),
875 parent_hash: Bytes::from(vec![2]),
876 revert: false,
877 },
878 ..Default::default()
879 };
880 v2_sync
881 .send_header(block3_msg.clone())
882 .await;
883 v3_sync.send_header(block3_msg).await;
884
885 let third_feed_msg = rx
887 .recv()
888 .await
889 .expect("header channel was closed");
890 assert!(third_feed_msg
891 .state_msgs
892 .contains_key("uniswap-v2"));
893 assert!(third_feed_msg
894 .state_msgs
895 .contains_key("uniswap-v3"));
896 assert!(matches!(
897 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
898 SynchronizerState::Ready(header) if header.number == 3
899 ));
900 assert!(matches!(
901 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
902 SynchronizerState::Ready(header) if header.number == 3
903 ));
904 }
905
906 #[test(tokio::test)]
907 async fn test_different_start_blocks() {
908 let v2_sync = MockStateSync::new();
909 let v3_sync = MockStateSync::new();
910 let block_sync = BlockSynchronizer::new(
911 std::time::Duration::from_millis(500),
912 std::time::Duration::from_millis(50),
913 10,
914 )
915 .register_synchronizer(
916 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
917 v2_sync.clone(),
918 )
919 .register_synchronizer(
920 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
921 v3_sync.clone(),
922 );
923
924 let block1_msg = StateSyncMessage {
926 header: Header {
927 number: 1,
928 hash: Bytes::from(vec![1]),
929 parent_hash: Bytes::from(vec![0]),
930 revert: false,
931 },
932 ..Default::default()
933 };
934 let block2_msg = StateSyncMessage {
935 header: Header {
936 number: 2,
937 hash: Bytes::from(vec![2]),
938 parent_hash: Bytes::from(vec![1]),
939 revert: false,
940 },
941 ..Default::default()
942 };
943
944 v2_sync
945 .send_header(block1_msg.clone())
946 .await;
947 v3_sync
948 .send_header(block2_msg.clone())
949 .await;
950
951 let (_jh, mut rx) = block_sync
953 .run()
954 .await
955 .expect("BlockSynchronizer failed to start.");
956
957 let first_feed_msg = rx
959 .recv()
960 .await
961 .expect("header channel was closed");
962 assert!(matches!(
963 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
964 SynchronizerState::Delayed(header) if header.number == 1
965 ));
966 assert!(matches!(
967 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
968 SynchronizerState::Ready(header) if header.number == 2
969 ));
970
971 v2_sync
973 .send_header(block2_msg.clone())
974 .await;
975
976 let block3_msg = StateSyncMessage {
978 header: Header {
979 number: 3,
980 hash: Bytes::from(vec![3]),
981 parent_hash: Bytes::from(vec![2]),
982 revert: false,
983 },
984 ..Default::default()
985 };
986 v2_sync
987 .send_header(block3_msg.clone())
988 .await;
989 v3_sync
990 .send_header(block3_msg.clone())
991 .await;
992
993 let second_feed_msg = rx
995 .recv()
996 .await
997 .expect("header channel was closed");
998 assert_eq!(second_feed_msg.state_msgs.len(), 2);
999 assert!(matches!(
1000 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1001 SynchronizerState::Ready(header) if header.number == 3
1002 ));
1003 assert!(matches!(
1004 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1005 SynchronizerState::Ready(header) if header.number == 3
1006 ));
1007 }
1008}