tycho_client/feed/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5    future::{join_all, try_join_all},
6    stream::FuturesUnordered,
7    StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12    select,
13    sync::mpsc::{self, Receiver},
14    task::JoinHandle,
15    time::timeout,
16};
17use tracing::{debug, error, info, trace, warn};
18use tycho_common::{
19    dto::{Block, ExtractorIdentity},
20    Bytes,
21};
22
23use crate::feed::{
24    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
25    synchronizer::{StateSyncMessage, StateSynchronizer, SynchronizerError},
26};
27
28mod block_history;
29pub mod component_tracker;
30pub mod synchronizer;
31
32#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
33pub struct Header {
34    pub hash: Bytes,
35    pub number: u64,
36    pub parent_hash: Bytes,
37    pub revert: bool,
38    pub timestamp: u64,
39}
40
41impl Header {
42    fn from_block(block: &Block, revert: bool) -> Self {
43        Self {
44            hash: block.hash.clone(),
45            number: block.number,
46            parent_hash: block.parent_hash.clone(),
47            revert,
48            timestamp: block.ts.timestamp() as u64,
49        }
50    }
51}
52
53#[derive(Error, Debug)]
54pub enum BlockSynchronizerError {
55    #[error("Failed to initialize synchronizer: {0}")]
56    InitializationError(#[from] SynchronizerError),
57
58    #[error("Failed to process new block: {0}")]
59    BlockHistoryError(#[from] BlockHistoryError),
60
61    #[error("Not a single synchronizer was ready")]
62    NoReadySynchronizers,
63
64    #[error("No synchronizers were set")]
65    NoSynchronizers,
66
67    #[error("Failed to convert duration: {0}")]
68    DurationConversionError(String),
69}
70
71type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
72
73/// Aligns multiple StateSynchronizers on the block dimension.
74///
75/// ## Purpose
76/// The purpose of this component is to handle streams from multiple state synchronizers and
77/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
78/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
79/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
80/// included again once we reach the block it is at.
81///
82/// ## Limitations
83/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
84///
85/// ## Initialisation
86/// Queries all registered synchronizers for their first message and evaluates the state of each
87/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
88/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
89/// closed.
90///
91/// ## Main loop
92/// Once started, the synchronizers are queried concurrently for messages in lock step:
93/// the main loop queries all synchronizers in ready for the last emitted data, builds the
94/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
95///
96/// ## Synchronization Logic
97///
98/// To classify a synchronizer as delayed, we need to first define the current block. The highest
99/// block number of all ready synchronizers is considered the current block.
100///
101/// Once we have the current block we can easily determine which block we expect next. And if a
102/// synchronizer delivers an older block we can classify it as delayed.
103///
104/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
105/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
106/// the height of an advanced synchronizer (and flagging it as such in the meantime).
107///
108/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
109/// within the block production step of the blockchain:
110/// The wait procedure consists of waiting for any of the receivers to emit a new message (within a
111/// max timeout - several multiples of the block time). Once a message is received a very short
112/// timeout start for the remaining synchronizers, to deliver a message. Any synchronizer failing to
113/// do so is transitioned to delayed.
114///
115/// ### Note
116/// The described process above is the goal. It is currently not implemented like that. Instead we
117/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
118/// timeout. This is simpler but only works well on chains with fixed block times.
119pub struct BlockSynchronizer<S> {
120    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
121    block_time: std::time::Duration,
122    max_wait: std::time::Duration,
123    max_messages: Option<usize>,
124    max_missed_blocks: u64,
125}
126
127#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
128#[serde(tag = "status", rename_all = "lowercase")]
129pub enum SynchronizerState {
130    Started,
131    Ready(Header),
132    // no progress, we consider it stale at that point we should purge it.
133    Stale(Header),
134    Delayed(Header),
135    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
136    // StateSynchronizer. This can only happen if we are processing too slow and one of the
137    // synchronizers restarts e.g. because Tycho ended the subscription.
138    Advanced(Header),
139    Ended,
140}
141
142pub struct SynchronizerStream {
143    extractor_id: ExtractorIdentity,
144    state: SynchronizerState,
145    modify_ts: NaiveDateTime,
146    rx: Receiver<StateSyncMessage>,
147}
148
149impl SynchronizerStream {
150    async fn try_advance(
151        &mut self,
152        block_history: &BlockHistory,
153        max_wait: std::time::Duration,
154        stale_threshold: std::time::Duration,
155    ) -> BlockSyncResult<Option<StateSyncMessage>> {
156        let extractor_id = self.extractor_id.clone();
157        let latest_block = block_history.latest();
158        match &self.state {
159            SynchronizerState::Started | SynchronizerState::Ended => {
160                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
161                Ok(None)
162            }
163            SynchronizerState::Advanced(b) => {
164                let future_block = b.clone();
165                // Transition to ready once we arrived at the expected height
166                self.transition(future_block, block_history, stale_threshold)?;
167                Ok(None)
168            }
169            SynchronizerState::Ready(previous_block) => {
170                // Try to recv the next expected block, update state accordingly.
171                self.try_recv_next_expected(
172                    max_wait,
173                    block_history,
174                    previous_block.clone(),
175                    stale_threshold,
176                )
177                .await
178                // TODO: if we entered advanced state we need to buffer the message for a while.
179            }
180            SynchronizerState::Delayed(old_block) => {
181                // try to catch up all currently queued blocks until the expected block
182                debug!(
183                    ?old_block,
184                    ?latest_block,
185                    %extractor_id,
186                    "Trying to catch up to latest block"
187                );
188                self.try_catch_up(block_history, max_wait, stale_threshold)
189                    .await
190            }
191            SynchronizerState::Stale(old_block) => {
192                debug!(
193                    ?old_block,
194                    ?latest_block,
195                    %extractor_id,
196                    "Trying to catch up to latest block"
197                );
198                self.try_catch_up(block_history, max_wait, stale_threshold)
199                    .await
200            }
201        }
202    }
203
204    /// Standard way to advance a well-behaved state synchronizer.
205    ///
206    /// Will wait for a new block on the synchronizer within a timeout. And modify its state based
207    /// on the outcome.
208    async fn try_recv_next_expected(
209        &mut self,
210        max_wait: std::time::Duration,
211        block_history: &BlockHistory,
212        previous_block: Header,
213        stale_threshold: std::time::Duration,
214    ) -> BlockSyncResult<Option<StateSyncMessage>> {
215        let extractor_id = self.extractor_id.clone();
216        match timeout(max_wait, self.rx.recv()).await {
217            Ok(Some(msg)) => {
218                self.transition(msg.header.clone(), block_history, stale_threshold)?;
219                Ok(Some(msg))
220            }
221            Ok(None) => {
222                error!(
223                    %extractor_id,
224                    ?previous_block,
225                    "Extractor terminated: channel closed!"
226                );
227                self.state = SynchronizerState::Ended;
228                self.modify_ts = Local::now().naive_utc();
229                Ok(None)
230            }
231            Err(_) => {
232                // trying to advance a block timed out
233                debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
234                self.state = SynchronizerState::Delayed(previous_block.clone());
235                Ok(None)
236            }
237        }
238    }
239
240    /// Tries to catch up a delayed state synchronizer.
241    ///
242    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
243    /// by consuming all waiting messages in its queue and waiting for any new block messages
244    /// within a timeout. Finally, all update messages are merged into one and returned.
245    async fn try_catch_up(
246        &mut self,
247        block_history: &BlockHistory,
248        max_wait: std::time::Duration,
249        stale_threshold: std::time::Duration,
250    ) -> BlockSyncResult<Option<StateSyncMessage>> {
251        let mut results = Vec::new();
252        let extractor_id = self.extractor_id.clone();
253
254        // Set a deadline for the overall catch-up operation
255        let deadline = std::time::Instant::now() + max_wait;
256
257        while std::time::Instant::now() < deadline {
258            match timeout(
259                deadline.saturating_duration_since(std::time::Instant::now()),
260                self.rx.recv(),
261            )
262            .await
263            {
264                Ok(Some(msg)) => {
265                    debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
266                    let block_pos = block_history.determine_block_position(&msg.header)?;
267                    results.push(msg);
268                    if matches!(block_pos, BlockPosition::NextExpected) {
269                        break;
270                    }
271                }
272                Ok(None) => {
273                    warn!(%extractor_id, "Channel closed during catch-up");
274                    self.state = SynchronizerState::Ended;
275                    return Ok(None);
276                }
277                Err(_) => {
278                    debug!(%extractor_id, "Timed out waiting for catch-up");
279                    break;
280                }
281            }
282        }
283
284        let merged = results
285            .into_iter()
286            .reduce(|l, r| l.merge(r));
287
288        if let Some(msg) = merged {
289            // we were able to get at least one block out
290            debug!(?extractor_id, "Delayed extractor made progress!");
291            self.transition(msg.header.clone(), block_history, stale_threshold)?;
292            Ok(Some(msg))
293        } else {
294            Ok(None)
295        }
296    }
297
298    /// Logic to transition a state synchronizer based on newly received block
299    ///
300    /// Updates the synchronizer's state according to the position of the received block:
301    /// - Next expected block -> Ready state
302    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
303    /// - Advanced block -> Advanced state (block ahead of expected position)
304    fn transition(
305        &mut self,
306        latest_retrieved: Header,
307        block_history: &BlockHistory,
308        stale_threshold: std::time::Duration,
309    ) -> Result<(), BlockSynchronizerError> {
310        let extractor_id = self.extractor_id.clone();
311        let last_message_at = self.modify_ts;
312        let block = &latest_retrieved;
313
314        match block_history.determine_block_position(&latest_retrieved)? {
315            BlockPosition::NextExpected => {
316                self.state = SynchronizerState::Ready(latest_retrieved.clone());
317            }
318            BlockPosition::Latest | BlockPosition::Delayed => {
319                let now = Local::now().naive_utc();
320                let wait_duration = self
321                    .modify_ts
322                    .signed_duration_since(now);
323                let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
324                    .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
325                if wait_duration > stale_threshold_chrono {
326                    warn!(
327                        ?extractor_id,
328                        ?last_message_at,
329                        ?block,
330                        "Extractor transition to stale."
331                    );
332                    self.state = SynchronizerState::Stale(latest_retrieved.clone());
333                } else {
334                    warn!(
335                        ?extractor_id,
336                        ?last_message_at,
337                        ?block,
338                        "Extractor transition to delayed."
339                    );
340                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
341                }
342            }
343            BlockPosition::Advanced => {
344                error!(
345                    ?extractor_id,
346                    ?last_message_at,
347                    ?block,
348                    "Extractor transition to advanced."
349                );
350                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
351            }
352        }
353        self.modify_ts = Local::now().naive_utc();
354        Ok(())
355    }
356}
357
358#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
359pub struct FeedMessage {
360    pub state_msgs: HashMap<String, StateSyncMessage>,
361    pub sync_states: HashMap<String, SynchronizerState>,
362}
363
364impl FeedMessage {
365    fn new(
366        state_msgs: HashMap<String, StateSyncMessage>,
367        sync_states: HashMap<String, SynchronizerState>,
368    ) -> Self {
369        Self { state_msgs, sync_states }
370    }
371}
372
373impl<S> BlockSynchronizer<S>
374where
375    S: StateSynchronizer,
376{
377    pub fn new(
378        block_time: std::time::Duration,
379        max_wait: std::time::Duration,
380        max_missed_blocks: u64,
381    ) -> Self {
382        Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
383    }
384
385    pub fn max_messages(&mut self, val: usize) {
386        self.max_messages = Some(val);
387    }
388
389    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
390        let mut registered = self.synchronizers.unwrap_or_default();
391        registered.insert(id, synchronizer);
392        self.synchronizers = Some(registered);
393        self
394    }
395    pub async fn run(mut self) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage>)> {
396        trace!("Starting BlockSynchronizer...");
397        let mut state_sync_tasks = FuturesUnordered::new();
398        let mut synchronizers = self
399            .synchronizers
400            .take()
401            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
402        // init synchronizers
403        let init_tasks = synchronizers
404            .values()
405            .map(|s| s.initialize())
406            .collect::<Vec<_>>();
407        try_join_all(init_tasks).await?;
408
409        let mut sync_streams = HashMap::with_capacity(synchronizers.len());
410        let mut sync_handles = Vec::new();
411        for (extractor_id, synchronizer) in synchronizers.drain() {
412            let (jh, rx) = synchronizer.start().await?;
413            state_sync_tasks.push(jh);
414            sync_handles.push(synchronizer);
415
416            sync_streams.insert(
417                extractor_id.clone(),
418                SynchronizerStream {
419                    extractor_id,
420                    state: SynchronizerState::Started,
421                    modify_ts: Local::now().naive_utc(),
422                    rx,
423                },
424            );
425        }
426
427        // startup, schedule first set of futures and wait for them to return to initialise
428        // synchronizers.
429        debug!("Waiting for initial synchronizer messages...");
430        let mut startup_futures = Vec::new();
431        for (id, sh) in sync_streams.iter_mut() {
432            let fut = async {
433                let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
434                (id.clone(), res)
435            };
436            startup_futures.push(fut);
437        }
438        let mut ready_sync_msgs = HashMap::new();
439        let initial_headers = join_all(startup_futures)
440            .await
441            .into_iter()
442            .filter_map(|(extractor_id, res)| {
443                let synchronizer = sync_streams
444                .get_mut(&extractor_id)
445                .unwrap();
446            match res {
447                Ok(Some(msg)) => {
448                    debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
449                    // initially default all synchronizers to Ready
450                    synchronizer.state = SynchronizerState::Ready(msg.header.clone());
451                    synchronizer.modify_ts = Local::now().naive_utc();
452                    ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
453                    Some(msg.header)
454                }
455                Ok(None) => {
456                    warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
457                    synchronizer.state = SynchronizerState::Ended;
458                    synchronizer.modify_ts = Local::now().naive_utc();
459                    None
460                }
461                Err(_) => {
462                    warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
463                    synchronizer.state = SynchronizerState::Stale(Header::default());
464                    synchronizer.modify_ts = Local::now().naive_utc();
465                    None
466                }
467            }
468        })
469        .collect::<HashSet<_>>() // remove duplicates
470        .into_iter()
471        .collect::<Vec<_>>();
472
473        let mut block_history = BlockHistory::new(initial_headers, 15)?;
474
475        // Determine the starting header for synchronization
476        let start_header = block_history
477            .latest()
478            .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
479        info!(
480            start_block=?start_header,
481            n_healthy=?ready_sync_msgs.len(),
482            "Block synchronization started successfully!"
483        );
484
485        // Purge any stale synchronizers
486        // All synchronizers that did not timeout on start up are initialized as Ready, including
487        // those that are Delayed. Delayed synchronizers are identified and updated accordingly in
488        // the next step.
489        sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
490
491        // Determine correct state for each remaining synchronizer, based on their header vs the
492        // latest one
493        for (_, stream) in sync_streams.iter_mut() {
494            if let SynchronizerState::Ready(header) = &stream.state.clone() {
495                if header.number < start_header.number {
496                    stream.state = SynchronizerState::Delayed(header.clone());
497                    debug!(
498                        extractor_id=%stream.extractor_id,
499                        synchronizer_block=?header.number,
500                        current_block=?start_header.number,
501                        "Marking synchronizer as delayed during initialization"
502                    );
503                }
504            }
505        }
506
507        let (sync_tx, sync_rx) = mpsc::channel(30);
508        let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
509            let mut n_iter = 1;
510            loop {
511                // Send retrieved data to receivers.
512                sync_tx
513                    .send(FeedMessage::new(
514                        std::mem::take(&mut ready_sync_msgs),
515                        sync_streams
516                            .iter()
517                            .map(|(a, b)| (a.name.to_string(), b.state.clone()))
518                            .collect(),
519                    ))
520                    .await?;
521
522                // Check if we have reached the max messages
523                if let Some(max_messages) = self.max_messages {
524                    if n_iter >= max_messages {
525                        info!(max_messages, "StreamEnd");
526                        return Ok(());
527                    }
528                }
529                n_iter += 1;
530
531                // Here we simply wait block_time + max_wait. This will not work for chains with
532                // unknown block times but is simple enough for now.
533                // If we would like to support unknown block times we could: Instruct all handles to
534                // await the max block time, if a header arrives within that time transition as
535                // usual, but via a select statement get notified (using e.g. Notify) if any other
536                // handle finishes before the timeout. Then await again but this time only for
537                // max_wait and then proceed as usual. So basically each try_advance task would have
538                // a select statement that allows it to exit the first timeout preemptively if any
539                // other try_advance task finished earlier.
540                let mut recv_futures = Vec::new();
541                for (extractor_id, sh) in sync_streams.iter_mut() {
542                    recv_futures.push(async {
543                        let res = sh
544                            .try_advance(
545                                &block_history,
546                                self.block_time + self.max_wait,
547                                self.block_time
548                                    .mul_f64(self.max_missed_blocks as f64),
549                            )
550                            .await?;
551                        Ok::<_, BlockSynchronizerError>(
552                            res.map(|msg| (extractor_id.name.clone(), msg)),
553                        )
554                    });
555                }
556                ready_sync_msgs.extend(
557                    join_all(recv_futures)
558                        .await
559                        .into_iter()
560                        .collect::<Result<Vec<_>, _>>()?
561                        .into_iter()
562                        .flatten(),
563                );
564
565                // Purge any bad synchronizers, respective warnings have already been issued at
566                // transition time.
567                sync_streams.retain(|_, v| match v.state {
568                    SynchronizerState::Started | SynchronizerState::Ended => false,
569                    SynchronizerState::Stale(_) => false,
570                    SynchronizerState::Ready(_) => true,
571                    SynchronizerState::Delayed(_) => true,
572                    SynchronizerState::Advanced(_) => true,
573                });
574
575                block_history.push(
576                    sync_streams
577                        .values()
578                        .filter_map(|v| match &v.state {
579                            SynchronizerState::Ready(b) => Some(b),
580                            _ => None,
581                        })
582                        .next()
583                        // no synchronizers are ready
584                        .ok_or(BlockSynchronizerError::NoReadySynchronizers)?
585                        .clone(),
586                )?;
587            }
588        });
589
590        let nanny_jh = tokio::spawn(async move {
591            select! {
592                error = state_sync_tasks.select_next_some() => {
593                    for s in sync_handles.iter_mut() {
594                        if let Err(e) = s.close().await {
595                            error!(error=?e, "Failed to close synchronizer: was not started!");
596                        }
597                    }
598                    error!(?error, "State synchronizer exited");
599                },
600                error = main_loop_jh => {
601                    error!(?error, "Feed main loop exited");
602                }
603            }
604        });
605        Ok((nanny_jh, sync_rx))
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use std::sync::Arc;
612
613    use async_trait::async_trait;
614    use test_log::test;
615    use tokio::sync::{oneshot, Mutex};
616    use tycho_common::dto::Chain;
617
618    use super::{synchronizer::SynchronizerError, *};
619    use crate::feed::synchronizer::SyncResult;
620
621    #[derive(Clone)]
622    struct MockStateSync {
623        header_tx: mpsc::Sender<StateSyncMessage>,
624        header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage>>>>,
625        end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
626    }
627
628    impl MockStateSync {
629        fn new() -> Self {
630            let (tx, rx) = mpsc::channel(1);
631            Self {
632                header_tx: tx,
633                header_rx: Arc::new(Mutex::new(Some(rx))),
634                end_tx: Arc::new(Mutex::new(None)),
635            }
636        }
637        async fn send_header(&self, header: StateSyncMessage) {
638            self.header_tx
639                .send(header)
640                .await
641                .expect("sending header failed");
642        }
643    }
644
645    #[async_trait]
646    impl StateSynchronizer for MockStateSync {
647        async fn initialize(&self) -> SyncResult<()> {
648            Ok(())
649        }
650
651        async fn start(
652            &self,
653        ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage>)> {
654            let block_rx = {
655                let mut guard = self.header_rx.lock().await;
656                guard
657                    .take()
658                    .expect("Block receiver was not set!")
659            };
660
661            let end_rx = {
662                let (end_tx, end_rx) = oneshot::channel();
663                let mut guard = self.end_tx.lock().await;
664                *guard = Some(end_tx);
665                end_rx
666            };
667
668            let jh = tokio::spawn(async move {
669                let _ = end_rx.await;
670                SyncResult::Ok(())
671            });
672
673            Ok((jh, block_rx))
674        }
675
676        async fn close(&mut self) -> SyncResult<()> {
677            let mut guard = self.end_tx.lock().await;
678            if let Some(tx) = guard.take() {
679                tx.send(())
680                    .expect("end channel closed!");
681                Ok(())
682            } else {
683                Err(SynchronizerError::CloseError("Not Started".to_string()))
684            }
685        }
686    }
687
688    #[test(tokio::test)]
689    async fn test_two_ready_synchronizers() {
690        let v2_sync = MockStateSync::new();
691        let v3_sync = MockStateSync::new();
692        let block_sync = BlockSynchronizer::new(
693            std::time::Duration::from_millis(500),
694            std::time::Duration::from_millis(50),
695            10,
696        )
697        .register_synchronizer(
698            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
699            v2_sync.clone(),
700        )
701        .register_synchronizer(
702            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
703            v3_sync.clone(),
704        );
705        let start_msg = StateSyncMessage {
706            header: Header { number: 1, ..Default::default() },
707            ..Default::default()
708        };
709        v2_sync
710            .send_header(start_msg.clone())
711            .await;
712        v3_sync
713            .send_header(start_msg.clone())
714            .await;
715
716        let (_jh, mut rx) = block_sync
717            .run()
718            .await
719            .expect("BlockSynchronizer failed to start.");
720        let first_feed_msg = rx
721            .recv()
722            .await
723            .expect("header channel was closed");
724        let second_msg = StateSyncMessage {
725            header: Header { number: 2, ..Default::default() },
726            ..Default::default()
727        };
728        v2_sync
729            .send_header(second_msg.clone())
730            .await;
731        v3_sync
732            .send_header(second_msg.clone())
733            .await;
734        let second_feed_msg = rx
735            .recv()
736            .await
737            .expect("header channel was closed!");
738
739        let exp1 = FeedMessage {
740            state_msgs: [
741                ("uniswap-v2".to_string(), start_msg.clone()),
742                ("uniswap-v3".to_string(), start_msg.clone()),
743            ]
744            .into_iter()
745            .collect(),
746            sync_states: [
747                ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
748                ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
749            ]
750            .into_iter()
751            .collect(),
752        };
753        let exp2 = FeedMessage {
754            state_msgs: [
755                ("uniswap-v2".to_string(), second_msg.clone()),
756                ("uniswap-v3".to_string(), second_msg.clone()),
757            ]
758            .into_iter()
759            .collect(),
760            sync_states: [
761                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
762                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
763            ]
764            .into_iter()
765            .collect(),
766        };
767        assert_eq!(first_feed_msg, exp1);
768        assert_eq!(second_feed_msg, exp2);
769    }
770
771    #[test(tokio::test)]
772    async fn test_delayed_synchronizer_catches_up() {
773        let v2_sync = MockStateSync::new();
774        let v3_sync = MockStateSync::new();
775        let block_sync = BlockSynchronizer::new(
776            std::time::Duration::from_millis(500),
777            std::time::Duration::from_millis(50),
778            10,
779        )
780        .register_synchronizer(
781            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
782            v2_sync.clone(),
783        )
784        .register_synchronizer(
785            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
786            v3_sync.clone(),
787        );
788
789        // Initial messages - both synchronizers are at block 1
790        let block1_msg = StateSyncMessage {
791            header: Header {
792                number: 1,
793                hash: Bytes::from(vec![1]),
794                parent_hash: Bytes::from(vec![0]),
795                revert: false,
796                ..Default::default()
797            },
798            ..Default::default()
799        };
800        v2_sync
801            .send_header(block1_msg.clone())
802            .await;
803        v3_sync
804            .send_header(block1_msg.clone())
805            .await;
806
807        // Start the block synchronizer
808        let (_jh, mut rx) = block_sync
809            .run()
810            .await
811            .expect("BlockSynchronizer failed to start.");
812
813        // Consume the first message
814        let first_feed_msg = rx
815            .recv()
816            .await
817            .expect("header channel was closed");
818        assert_eq!(first_feed_msg.state_msgs.len(), 2);
819        assert!(matches!(
820            first_feed_msg
821                .sync_states
822                .get("uniswap-v2")
823                .unwrap(),
824            SynchronizerState::Ready(_)
825        ));
826        assert!(matches!(
827            first_feed_msg
828                .sync_states
829                .get("uniswap-v3")
830                .unwrap(),
831            SynchronizerState::Ready(_)
832        ));
833
834        // Send block 2 to v2 synchronizer only
835        let block2_msg = StateSyncMessage {
836            header: Header {
837                number: 2,
838                hash: Bytes::from(vec![2]),
839                parent_hash: Bytes::from(vec![1]),
840                revert: false,
841                ..Default::default()
842            },
843            ..Default::default()
844        };
845        v2_sync
846            .send_header(block2_msg.clone())
847            .await;
848
849        // Consume second message - v3 should be delayed
850        let second_feed_msg = rx
851            .recv()
852            .await
853            .expect("header channel was closed");
854        assert!(second_feed_msg
855            .state_msgs
856            .contains_key("uniswap-v2"));
857        assert!(matches!(
858            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
859            SynchronizerState::Ready(header) if header.number == 2
860        ));
861        assert!(!second_feed_msg
862            .state_msgs
863            .contains_key("uniswap-v3"));
864        assert!(matches!(
865            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
866            SynchronizerState::Delayed(header) if header.number == 1
867        ));
868
869        // Now v3 catches up to block 2
870        v3_sync
871            .send_header(block2_msg.clone())
872            .await;
873
874        // Both advance to block 3
875        let block3_msg = StateSyncMessage {
876            header: Header {
877                number: 3,
878                hash: Bytes::from(vec![3]),
879                parent_hash: Bytes::from(vec![2]),
880                revert: false,
881                ..Default::default()
882            },
883            ..Default::default()
884        };
885        v2_sync
886            .send_header(block3_msg.clone())
887            .await;
888        v3_sync.send_header(block3_msg).await;
889
890        // Consume third message - both should be on block 3
891        let third_feed_msg = rx
892            .recv()
893            .await
894            .expect("header channel was closed");
895        assert!(third_feed_msg
896            .state_msgs
897            .contains_key("uniswap-v2"));
898        assert!(third_feed_msg
899            .state_msgs
900            .contains_key("uniswap-v3"));
901        assert!(matches!(
902            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
903            SynchronizerState::Ready(header) if header.number == 3
904        ));
905        assert!(matches!(
906            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
907            SynchronizerState::Ready(header) if header.number == 3
908        ));
909    }
910
911    #[test(tokio::test)]
912    async fn test_different_start_blocks() {
913        let v2_sync = MockStateSync::new();
914        let v3_sync = MockStateSync::new();
915        let block_sync = BlockSynchronizer::new(
916            std::time::Duration::from_millis(500),
917            std::time::Duration::from_millis(50),
918            10,
919        )
920        .register_synchronizer(
921            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
922            v2_sync.clone(),
923        )
924        .register_synchronizer(
925            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
926            v3_sync.clone(),
927        );
928
929        // Initial messages - synchronizers at different blocks
930        let block1_msg = StateSyncMessage {
931            header: Header {
932                number: 1,
933                hash: Bytes::from(vec![1]),
934                parent_hash: Bytes::from(vec![0]),
935                revert: false,
936                ..Default::default()
937            },
938            ..Default::default()
939        };
940        let block2_msg = StateSyncMessage {
941            header: Header {
942                number: 2,
943                hash: Bytes::from(vec![2]),
944                parent_hash: Bytes::from(vec![1]),
945                revert: false,
946                ..Default::default()
947            },
948            ..Default::default()
949        };
950
951        v2_sync
952            .send_header(block1_msg.clone())
953            .await;
954        v3_sync
955            .send_header(block2_msg.clone())
956            .await;
957
958        // Start the block synchronizer - it should use block 2 as the starting block
959        let (_jh, mut rx) = block_sync
960            .run()
961            .await
962            .expect("BlockSynchronizer failed to start.");
963
964        // Consume first message
965        let first_feed_msg = rx
966            .recv()
967            .await
968            .expect("header channel was closed");
969        assert!(matches!(
970            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
971            SynchronizerState::Delayed(header) if header.number == 1
972        ));
973        assert!(matches!(
974            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
975            SynchronizerState::Ready(header) if header.number == 2
976        ));
977
978        // Now v2 catches up to block 2
979        v2_sync
980            .send_header(block2_msg.clone())
981            .await;
982
983        // Both advance to block 3
984        let block3_msg = StateSyncMessage {
985            header: Header {
986                number: 3,
987                hash: Bytes::from(vec![3]),
988                parent_hash: Bytes::from(vec![2]),
989                revert: false,
990                ..Default::default()
991            },
992            ..Default::default()
993        };
994        v2_sync
995            .send_header(block3_msg.clone())
996            .await;
997        v3_sync
998            .send_header(block3_msg.clone())
999            .await;
1000
1001        // Consume third message - both should be on block 3
1002        let second_feed_msg = rx
1003            .recv()
1004            .await
1005            .expect("header channel was closed");
1006        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1007        assert!(matches!(
1008            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1009            SynchronizerState::Ready(header) if header.number == 3
1010        ));
1011        assert!(matches!(
1012            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1013            SynchronizerState::Ready(header) if header.number == 3
1014        ));
1015    }
1016}