1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5 future::{join_all, try_join_all},
6 stream::FuturesUnordered,
7 StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12 select,
13 sync::mpsc::{self, Receiver},
14 task::JoinHandle,
15 time::timeout,
16};
17use tracing::{debug, error, info, trace, warn};
18use tycho_common::{
19 dto::{Block, ExtractorIdentity},
20 Bytes,
21};
22
23use crate::feed::{
24 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
25 synchronizer::{StateSyncMessage, StateSynchronizer, SynchronizerError},
26};
27
28mod block_history;
29pub mod component_tracker;
30pub mod synchronizer;
31
32#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
33pub struct Header {
34 pub hash: Bytes,
35 pub number: u64,
36 pub parent_hash: Bytes,
37 pub revert: bool,
38 pub timestamp: u64,
39}
40
41impl Header {
42 fn from_block(block: &Block, revert: bool) -> Self {
43 Self {
44 hash: block.hash.clone(),
45 number: block.number,
46 parent_hash: block.parent_hash.clone(),
47 revert,
48 timestamp: block.ts.timestamp() as u64,
49 }
50 }
51}
52
53#[derive(Error, Debug)]
54pub enum BlockSynchronizerError {
55 #[error("Failed to initialize synchronizer: {0}")]
56 InitializationError(#[from] SynchronizerError),
57
58 #[error("Failed to process new block: {0}")]
59 BlockHistoryError(#[from] BlockHistoryError),
60
61 #[error("Not a single synchronizer was ready")]
62 NoReadySynchronizers,
63
64 #[error("No synchronizers were set")]
65 NoSynchronizers,
66
67 #[error("Failed to convert duration: {0}")]
68 DurationConversionError(String),
69}
70
71type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
72
73pub struct BlockSynchronizer<S> {
120 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
121 block_time: std::time::Duration,
122 max_wait: std::time::Duration,
123 max_messages: Option<usize>,
124 max_missed_blocks: u64,
125}
126
127#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
128#[serde(tag = "status", rename_all = "lowercase")]
129pub enum SynchronizerState {
130 Started,
131 Ready(Header),
132 Stale(Header),
134 Delayed(Header),
135 Advanced(Header),
139 Ended,
140}
141
142pub struct SynchronizerStream {
143 extractor_id: ExtractorIdentity,
144 state: SynchronizerState,
145 modify_ts: NaiveDateTime,
146 rx: Receiver<StateSyncMessage>,
147}
148
149impl SynchronizerStream {
150 async fn try_advance(
151 &mut self,
152 block_history: &BlockHistory,
153 max_wait: std::time::Duration,
154 stale_threshold: std::time::Duration,
155 ) -> BlockSyncResult<Option<StateSyncMessage>> {
156 let extractor_id = self.extractor_id.clone();
157 let latest_block = block_history.latest();
158 match &self.state {
159 SynchronizerState::Started | SynchronizerState::Ended => {
160 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
161 Ok(None)
162 }
163 SynchronizerState::Advanced(b) => {
164 let future_block = b.clone();
165 self.transition(future_block, block_history, stale_threshold)?;
167 Ok(None)
168 }
169 SynchronizerState::Ready(previous_block) => {
170 self.try_recv_next_expected(
172 max_wait,
173 block_history,
174 previous_block.clone(),
175 stale_threshold,
176 )
177 .await
178 }
180 SynchronizerState::Delayed(old_block) => {
181 debug!(
183 ?old_block,
184 ?latest_block,
185 %extractor_id,
186 "Trying to catch up to latest block"
187 );
188 self.try_catch_up(block_history, max_wait, stale_threshold)
189 .await
190 }
191 SynchronizerState::Stale(old_block) => {
192 debug!(
193 ?old_block,
194 ?latest_block,
195 %extractor_id,
196 "Trying to catch up to latest block"
197 );
198 self.try_catch_up(block_history, max_wait, stale_threshold)
199 .await
200 }
201 }
202 }
203
204 async fn try_recv_next_expected(
209 &mut self,
210 max_wait: std::time::Duration,
211 block_history: &BlockHistory,
212 previous_block: Header,
213 stale_threshold: std::time::Duration,
214 ) -> BlockSyncResult<Option<StateSyncMessage>> {
215 let extractor_id = self.extractor_id.clone();
216 match timeout(max_wait, self.rx.recv()).await {
217 Ok(Some(msg)) => {
218 self.transition(msg.header.clone(), block_history, stale_threshold)?;
219 Ok(Some(msg))
220 }
221 Ok(None) => {
222 error!(
223 %extractor_id,
224 ?previous_block,
225 "Extractor terminated: channel closed!"
226 );
227 self.state = SynchronizerState::Ended;
228 self.modify_ts = Local::now().naive_utc();
229 Ok(None)
230 }
231 Err(_) => {
232 debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
234 self.state = SynchronizerState::Delayed(previous_block.clone());
235 Ok(None)
236 }
237 }
238 }
239
240 async fn try_catch_up(
246 &mut self,
247 block_history: &BlockHistory,
248 max_wait: std::time::Duration,
249 stale_threshold: std::time::Duration,
250 ) -> BlockSyncResult<Option<StateSyncMessage>> {
251 let mut results = Vec::new();
252 let extractor_id = self.extractor_id.clone();
253
254 let deadline = std::time::Instant::now() + max_wait;
256
257 while std::time::Instant::now() < deadline {
258 match timeout(
259 deadline.saturating_duration_since(std::time::Instant::now()),
260 self.rx.recv(),
261 )
262 .await
263 {
264 Ok(Some(msg)) => {
265 debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
266 let block_pos = block_history.determine_block_position(&msg.header)?;
267 results.push(msg);
268 if matches!(block_pos, BlockPosition::NextExpected) {
269 break;
270 }
271 }
272 Ok(None) => {
273 warn!(%extractor_id, "Channel closed during catch-up");
274 self.state = SynchronizerState::Ended;
275 return Ok(None);
276 }
277 Err(_) => {
278 debug!(%extractor_id, "Timed out waiting for catch-up");
279 break;
280 }
281 }
282 }
283
284 let merged = results
285 .into_iter()
286 .reduce(|l, r| l.merge(r));
287
288 if let Some(msg) = merged {
289 debug!(?extractor_id, "Delayed extractor made progress!");
291 self.transition(msg.header.clone(), block_history, stale_threshold)?;
292 Ok(Some(msg))
293 } else {
294 Ok(None)
295 }
296 }
297
298 fn transition(
305 &mut self,
306 latest_retrieved: Header,
307 block_history: &BlockHistory,
308 stale_threshold: std::time::Duration,
309 ) -> Result<(), BlockSynchronizerError> {
310 let extractor_id = self.extractor_id.clone();
311 let last_message_at = self.modify_ts;
312 let block = &latest_retrieved;
313
314 match block_history.determine_block_position(&latest_retrieved)? {
315 BlockPosition::NextExpected => {
316 self.state = SynchronizerState::Ready(latest_retrieved.clone());
317 }
318 BlockPosition::Latest | BlockPosition::Delayed => {
319 let now = Local::now().naive_utc();
320 let wait_duration = self
321 .modify_ts
322 .signed_duration_since(now);
323 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
324 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
325 if wait_duration > stale_threshold_chrono {
326 warn!(
327 ?extractor_id,
328 ?last_message_at,
329 ?block,
330 "Extractor transition to stale."
331 );
332 self.state = SynchronizerState::Stale(latest_retrieved.clone());
333 } else {
334 warn!(
335 ?extractor_id,
336 ?last_message_at,
337 ?block,
338 "Extractor transition to delayed."
339 );
340 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
341 }
342 }
343 BlockPosition::Advanced => {
344 error!(
345 ?extractor_id,
346 ?last_message_at,
347 ?block,
348 "Extractor transition to advanced."
349 );
350 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
351 }
352 }
353 self.modify_ts = Local::now().naive_utc();
354 Ok(())
355 }
356}
357
358#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
359pub struct FeedMessage {
360 pub state_msgs: HashMap<String, StateSyncMessage>,
361 pub sync_states: HashMap<String, SynchronizerState>,
362}
363
364impl FeedMessage {
365 fn new(
366 state_msgs: HashMap<String, StateSyncMessage>,
367 sync_states: HashMap<String, SynchronizerState>,
368 ) -> Self {
369 Self { state_msgs, sync_states }
370 }
371}
372
373impl<S> BlockSynchronizer<S>
374where
375 S: StateSynchronizer,
376{
377 pub fn new(
378 block_time: std::time::Duration,
379 max_wait: std::time::Duration,
380 max_missed_blocks: u64,
381 ) -> Self {
382 Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
383 }
384
385 pub fn max_messages(&mut self, val: usize) {
386 self.max_messages = Some(val);
387 }
388
389 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
390 let mut registered = self.synchronizers.unwrap_or_default();
391 registered.insert(id, synchronizer);
392 self.synchronizers = Some(registered);
393 self
394 }
395 pub async fn run(mut self) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage>)> {
396 trace!("Starting BlockSynchronizer...");
397 let mut state_sync_tasks = FuturesUnordered::new();
398 let mut synchronizers = self
399 .synchronizers
400 .take()
401 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
402 let init_tasks = synchronizers
404 .values()
405 .map(|s| s.initialize())
406 .collect::<Vec<_>>();
407 try_join_all(init_tasks).await?;
408
409 let mut sync_streams = HashMap::with_capacity(synchronizers.len());
410 let mut sync_handles = Vec::new();
411 for (extractor_id, synchronizer) in synchronizers.drain() {
412 let (jh, rx) = synchronizer.start().await?;
413 state_sync_tasks.push(jh);
414 sync_handles.push(synchronizer);
415
416 sync_streams.insert(
417 extractor_id.clone(),
418 SynchronizerStream {
419 extractor_id,
420 state: SynchronizerState::Started,
421 modify_ts: Local::now().naive_utc(),
422 rx,
423 },
424 );
425 }
426
427 debug!("Waiting for initial synchronizer messages...");
430 let mut startup_futures = Vec::new();
431 for (id, sh) in sync_streams.iter_mut() {
432 let fut = async {
433 let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
434 (id.clone(), res)
435 };
436 startup_futures.push(fut);
437 }
438 let mut ready_sync_msgs = HashMap::new();
439 let initial_headers = join_all(startup_futures)
440 .await
441 .into_iter()
442 .filter_map(|(extractor_id, res)| {
443 let synchronizer = sync_streams
444 .get_mut(&extractor_id)
445 .unwrap();
446 match res {
447 Ok(Some(msg)) => {
448 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
449 synchronizer.state = SynchronizerState::Ready(msg.header.clone());
451 synchronizer.modify_ts = Local::now().naive_utc();
452 ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
453 Some(msg.header)
454 }
455 Ok(None) => {
456 warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
457 synchronizer.state = SynchronizerState::Ended;
458 synchronizer.modify_ts = Local::now().naive_utc();
459 None
460 }
461 Err(_) => {
462 warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
463 synchronizer.state = SynchronizerState::Stale(Header::default());
464 synchronizer.modify_ts = Local::now().naive_utc();
465 None
466 }
467 }
468 })
469 .collect::<HashSet<_>>() .into_iter()
471 .collect::<Vec<_>>();
472
473 let mut block_history = BlockHistory::new(initial_headers, 15)?;
474
475 let start_header = block_history
477 .latest()
478 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
479 info!(
480 start_block=?start_header,
481 n_healthy=?ready_sync_msgs.len(),
482 "Block synchronization started successfully!"
483 );
484
485 sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
490
491 for (_, stream) in sync_streams.iter_mut() {
494 if let SynchronizerState::Ready(header) = &stream.state.clone() {
495 if header.number < start_header.number {
496 stream.state = SynchronizerState::Delayed(header.clone());
497 debug!(
498 extractor_id=%stream.extractor_id,
499 synchronizer_block=?header.number,
500 current_block=?start_header.number,
501 "Marking synchronizer as delayed during initialization"
502 );
503 }
504 }
505 }
506
507 let (sync_tx, sync_rx) = mpsc::channel(30);
508 let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
509 let mut n_iter = 1;
510 loop {
511 sync_tx
513 .send(FeedMessage::new(
514 std::mem::take(&mut ready_sync_msgs),
515 sync_streams
516 .iter()
517 .map(|(a, b)| (a.name.to_string(), b.state.clone()))
518 .collect(),
519 ))
520 .await?;
521
522 if let Some(max_messages) = self.max_messages {
524 if n_iter >= max_messages {
525 info!(max_messages, "StreamEnd");
526 return Ok(());
527 }
528 }
529 n_iter += 1;
530
531 let mut recv_futures = Vec::new();
541 for (extractor_id, sh) in sync_streams.iter_mut() {
542 recv_futures.push(async {
543 let res = sh
544 .try_advance(
545 &block_history,
546 self.block_time + self.max_wait,
547 self.block_time
548 .mul_f64(self.max_missed_blocks as f64),
549 )
550 .await?;
551 Ok::<_, BlockSynchronizerError>(
552 res.map(|msg| (extractor_id.name.clone(), msg)),
553 )
554 });
555 }
556 ready_sync_msgs.extend(
557 join_all(recv_futures)
558 .await
559 .into_iter()
560 .collect::<Result<Vec<_>, _>>()?
561 .into_iter()
562 .flatten(),
563 );
564
565 sync_streams.retain(|_, v| match v.state {
568 SynchronizerState::Started | SynchronizerState::Ended => false,
569 SynchronizerState::Stale(_) => false,
570 SynchronizerState::Ready(_) => true,
571 SynchronizerState::Delayed(_) => true,
572 SynchronizerState::Advanced(_) => true,
573 });
574
575 block_history.push(
576 sync_streams
577 .values()
578 .filter_map(|v| match &v.state {
579 SynchronizerState::Ready(b) => Some(b),
580 _ => None,
581 })
582 .next()
583 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?
585 .clone(),
586 )?;
587 }
588 });
589
590 let nanny_jh = tokio::spawn(async move {
591 select! {
592 error = state_sync_tasks.select_next_some() => {
593 for s in sync_handles.iter_mut() {
594 if let Err(e) = s.close().await {
595 error!(error=?e, "Failed to close synchronizer: was not started!");
596 }
597 }
598 error!(?error, "State synchronizer exited");
599 },
600 error = main_loop_jh => {
601 error!(?error, "Feed main loop exited");
602 }
603 }
604 });
605 Ok((nanny_jh, sync_rx))
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use std::sync::Arc;
612
613 use async_trait::async_trait;
614 use test_log::test;
615 use tokio::sync::{oneshot, Mutex};
616 use tycho_common::dto::Chain;
617
618 use super::{synchronizer::SynchronizerError, *};
619 use crate::feed::synchronizer::SyncResult;
620
621 #[derive(Clone)]
622 struct MockStateSync {
623 header_tx: mpsc::Sender<StateSyncMessage>,
624 header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage>>>>,
625 end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
626 }
627
628 impl MockStateSync {
629 fn new() -> Self {
630 let (tx, rx) = mpsc::channel(1);
631 Self {
632 header_tx: tx,
633 header_rx: Arc::new(Mutex::new(Some(rx))),
634 end_tx: Arc::new(Mutex::new(None)),
635 }
636 }
637 async fn send_header(&self, header: StateSyncMessage) {
638 self.header_tx
639 .send(header)
640 .await
641 .expect("sending header failed");
642 }
643 }
644
645 #[async_trait]
646 impl StateSynchronizer for MockStateSync {
647 async fn initialize(&self) -> SyncResult<()> {
648 Ok(())
649 }
650
651 async fn start(
652 &self,
653 ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage>)> {
654 let block_rx = {
655 let mut guard = self.header_rx.lock().await;
656 guard
657 .take()
658 .expect("Block receiver was not set!")
659 };
660
661 let end_rx = {
662 let (end_tx, end_rx) = oneshot::channel();
663 let mut guard = self.end_tx.lock().await;
664 *guard = Some(end_tx);
665 end_rx
666 };
667
668 let jh = tokio::spawn(async move {
669 let _ = end_rx.await;
670 SyncResult::Ok(())
671 });
672
673 Ok((jh, block_rx))
674 }
675
676 async fn close(&mut self) -> SyncResult<()> {
677 let mut guard = self.end_tx.lock().await;
678 if let Some(tx) = guard.take() {
679 tx.send(())
680 .expect("end channel closed!");
681 Ok(())
682 } else {
683 Err(SynchronizerError::CloseError("Not Started".to_string()))
684 }
685 }
686 }
687
688 #[test(tokio::test)]
689 async fn test_two_ready_synchronizers() {
690 let v2_sync = MockStateSync::new();
691 let v3_sync = MockStateSync::new();
692 let block_sync = BlockSynchronizer::new(
693 std::time::Duration::from_millis(500),
694 std::time::Duration::from_millis(50),
695 10,
696 )
697 .register_synchronizer(
698 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
699 v2_sync.clone(),
700 )
701 .register_synchronizer(
702 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
703 v3_sync.clone(),
704 );
705 let start_msg = StateSyncMessage {
706 header: Header { number: 1, ..Default::default() },
707 ..Default::default()
708 };
709 v2_sync
710 .send_header(start_msg.clone())
711 .await;
712 v3_sync
713 .send_header(start_msg.clone())
714 .await;
715
716 let (_jh, mut rx) = block_sync
717 .run()
718 .await
719 .expect("BlockSynchronizer failed to start.");
720 let first_feed_msg = rx
721 .recv()
722 .await
723 .expect("header channel was closed");
724 let second_msg = StateSyncMessage {
725 header: Header { number: 2, ..Default::default() },
726 ..Default::default()
727 };
728 v2_sync
729 .send_header(second_msg.clone())
730 .await;
731 v3_sync
732 .send_header(second_msg.clone())
733 .await;
734 let second_feed_msg = rx
735 .recv()
736 .await
737 .expect("header channel was closed!");
738
739 let exp1 = FeedMessage {
740 state_msgs: [
741 ("uniswap-v2".to_string(), start_msg.clone()),
742 ("uniswap-v3".to_string(), start_msg.clone()),
743 ]
744 .into_iter()
745 .collect(),
746 sync_states: [
747 ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
748 ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
749 ]
750 .into_iter()
751 .collect(),
752 };
753 let exp2 = FeedMessage {
754 state_msgs: [
755 ("uniswap-v2".to_string(), second_msg.clone()),
756 ("uniswap-v3".to_string(), second_msg.clone()),
757 ]
758 .into_iter()
759 .collect(),
760 sync_states: [
761 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
762 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
763 ]
764 .into_iter()
765 .collect(),
766 };
767 assert_eq!(first_feed_msg, exp1);
768 assert_eq!(second_feed_msg, exp2);
769 }
770
771 #[test(tokio::test)]
772 async fn test_delayed_synchronizer_catches_up() {
773 let v2_sync = MockStateSync::new();
774 let v3_sync = MockStateSync::new();
775 let block_sync = BlockSynchronizer::new(
776 std::time::Duration::from_millis(500),
777 std::time::Duration::from_millis(50),
778 10,
779 )
780 .register_synchronizer(
781 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
782 v2_sync.clone(),
783 )
784 .register_synchronizer(
785 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
786 v3_sync.clone(),
787 );
788
789 let block1_msg = StateSyncMessage {
791 header: Header {
792 number: 1,
793 hash: Bytes::from(vec![1]),
794 parent_hash: Bytes::from(vec![0]),
795 revert: false,
796 ..Default::default()
797 },
798 ..Default::default()
799 };
800 v2_sync
801 .send_header(block1_msg.clone())
802 .await;
803 v3_sync
804 .send_header(block1_msg.clone())
805 .await;
806
807 let (_jh, mut rx) = block_sync
809 .run()
810 .await
811 .expect("BlockSynchronizer failed to start.");
812
813 let first_feed_msg = rx
815 .recv()
816 .await
817 .expect("header channel was closed");
818 assert_eq!(first_feed_msg.state_msgs.len(), 2);
819 assert!(matches!(
820 first_feed_msg
821 .sync_states
822 .get("uniswap-v2")
823 .unwrap(),
824 SynchronizerState::Ready(_)
825 ));
826 assert!(matches!(
827 first_feed_msg
828 .sync_states
829 .get("uniswap-v3")
830 .unwrap(),
831 SynchronizerState::Ready(_)
832 ));
833
834 let block2_msg = StateSyncMessage {
836 header: Header {
837 number: 2,
838 hash: Bytes::from(vec![2]),
839 parent_hash: Bytes::from(vec![1]),
840 revert: false,
841 ..Default::default()
842 },
843 ..Default::default()
844 };
845 v2_sync
846 .send_header(block2_msg.clone())
847 .await;
848
849 let second_feed_msg = rx
851 .recv()
852 .await
853 .expect("header channel was closed");
854 assert!(second_feed_msg
855 .state_msgs
856 .contains_key("uniswap-v2"));
857 assert!(matches!(
858 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
859 SynchronizerState::Ready(header) if header.number == 2
860 ));
861 assert!(!second_feed_msg
862 .state_msgs
863 .contains_key("uniswap-v3"));
864 assert!(matches!(
865 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
866 SynchronizerState::Delayed(header) if header.number == 1
867 ));
868
869 v3_sync
871 .send_header(block2_msg.clone())
872 .await;
873
874 let block3_msg = StateSyncMessage {
876 header: Header {
877 number: 3,
878 hash: Bytes::from(vec![3]),
879 parent_hash: Bytes::from(vec![2]),
880 revert: false,
881 ..Default::default()
882 },
883 ..Default::default()
884 };
885 v2_sync
886 .send_header(block3_msg.clone())
887 .await;
888 v3_sync.send_header(block3_msg).await;
889
890 let third_feed_msg = rx
892 .recv()
893 .await
894 .expect("header channel was closed");
895 assert!(third_feed_msg
896 .state_msgs
897 .contains_key("uniswap-v2"));
898 assert!(third_feed_msg
899 .state_msgs
900 .contains_key("uniswap-v3"));
901 assert!(matches!(
902 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
903 SynchronizerState::Ready(header) if header.number == 3
904 ));
905 assert!(matches!(
906 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
907 SynchronizerState::Ready(header) if header.number == 3
908 ));
909 }
910
911 #[test(tokio::test)]
912 async fn test_different_start_blocks() {
913 let v2_sync = MockStateSync::new();
914 let v3_sync = MockStateSync::new();
915 let block_sync = BlockSynchronizer::new(
916 std::time::Duration::from_millis(500),
917 std::time::Duration::from_millis(50),
918 10,
919 )
920 .register_synchronizer(
921 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
922 v2_sync.clone(),
923 )
924 .register_synchronizer(
925 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
926 v3_sync.clone(),
927 );
928
929 let block1_msg = StateSyncMessage {
931 header: Header {
932 number: 1,
933 hash: Bytes::from(vec![1]),
934 parent_hash: Bytes::from(vec![0]),
935 revert: false,
936 ..Default::default()
937 },
938 ..Default::default()
939 };
940 let block2_msg = StateSyncMessage {
941 header: Header {
942 number: 2,
943 hash: Bytes::from(vec![2]),
944 parent_hash: Bytes::from(vec![1]),
945 revert: false,
946 ..Default::default()
947 },
948 ..Default::default()
949 };
950
951 v2_sync
952 .send_header(block1_msg.clone())
953 .await;
954 v3_sync
955 .send_header(block2_msg.clone())
956 .await;
957
958 let (_jh, mut rx) = block_sync
960 .run()
961 .await
962 .expect("BlockSynchronizer failed to start.");
963
964 let first_feed_msg = rx
966 .recv()
967 .await
968 .expect("header channel was closed");
969 assert!(matches!(
970 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
971 SynchronizerState::Delayed(header) if header.number == 1
972 ));
973 assert!(matches!(
974 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
975 SynchronizerState::Ready(header) if header.number == 2
976 ));
977
978 v2_sync
980 .send_header(block2_msg.clone())
981 .await;
982
983 let block3_msg = StateSyncMessage {
985 header: Header {
986 number: 3,
987 hash: Bytes::from(vec![3]),
988 parent_hash: Bytes::from(vec![2]),
989 revert: false,
990 ..Default::default()
991 },
992 ..Default::default()
993 };
994 v2_sync
995 .send_header(block3_msg.clone())
996 .await;
997 v3_sync
998 .send_header(block3_msg.clone())
999 .await;
1000
1001 let second_feed_msg = rx
1003 .recv()
1004 .await
1005 .expect("header channel was closed");
1006 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1007 assert!(matches!(
1008 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1009 SynchronizerState::Ready(header) if header.number == 3
1010 ));
1011 assert!(matches!(
1012 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1013 SynchronizerState::Ready(header) if header.number == 3
1014 ));
1015 }
1016}