tycho_client/feed/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5    future::{join_all, try_join_all},
6    stream::FuturesUnordered,
7    StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use tokio::{
11    select,
12    sync::mpsc::{self, Receiver},
13    task::JoinHandle,
14    time::timeout,
15};
16use tracing::{debug, error, info, trace, warn};
17use tycho_common::{
18    dto::{Block, ExtractorIdentity},
19    Bytes,
20};
21
22use crate::feed::{
23    block_history::{BlockHistory, BlockPosition},
24    synchronizer::{StateSyncMessage, StateSynchronizer},
25};
26
27mod block_history;
28pub mod component_tracker;
29pub mod synchronizer;
30
31#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
32pub struct Header {
33    pub hash: Bytes,
34    pub number: u64,
35    pub parent_hash: Bytes,
36    pub revert: bool,
37}
38
39impl Header {
40    fn from_block(block: &Block, revert: bool) -> Self {
41        Self {
42            hash: block.hash.clone(),
43            number: block.number,
44            parent_hash: block.parent_hash.clone(),
45            revert,
46        }
47    }
48}
49
50type BlockSyncResult<T> = anyhow::Result<T>;
51
52/// Aligns multiple StateSynchronizers on the block dimension.
53///
54/// ## Purpose
55/// The purpose of this component is to handle streams from multiple state synchronizers and
56/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
57/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
58/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
59/// included again once we reach the block it is at.
60///
61/// ## Limitations
62/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
63///
64/// ## Initialisation
65/// Queries all registered synchronizers for their first message and evaluates the state of each
66/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
67/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
68/// closed.
69///
70/// ## Main loop
71/// Once started, the synchronizers are queried concurrently for messages in lock step:
72/// the main loop queries all synchronizers in ready for the last emitted data, builds the
73/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
74///
75/// ## Synchronization Logic
76///
77/// To classify a synchronizer as delayed, we need to first define the current block. The highest
78/// block number of all ready synchronizers is considered the current block.
79///
80/// Once we have the current block we can easily determine which block we expect next. And if a
81/// synchronizer delivers an older block we can classify it as delayed.
82///
83/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
84/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
85/// the height of an advanced synchronizer (and flagging it as such in the meantime).
86///
87/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
88/// within the block production step of the blockchain:
89/// The wait procedure consists of waiting for any of the receivers to emit a new message (within a
90/// max timeout - several multiples of the block time). Once a message is received a very short
91/// timeout start for the remaining synchronizers, to deliver a message. Any synchronizer failing to
92/// do so is transitioned to delayed.
93///
94/// ### Note
95/// The described process above is the goal. It is currently not implemented like that. Instead we
96/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
97/// timeout. This is simpler but only works well on chains with fixed block times.
98pub struct BlockSynchronizer<S> {
99    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
100    block_time: std::time::Duration,
101    max_wait: std::time::Duration,
102    max_messages: Option<usize>,
103    max_missed_blocks: u64,
104}
105
106#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
107#[serde(tag = "status", rename_all = "lowercase")]
108pub enum SynchronizerState {
109    Started,
110    Ready(Header),
111    // no progress, we consider it stale at that point we should purge it.
112    Stale(Header),
113    Delayed(Header),
114    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
115    // StateSynchronizer. This can only happen if we are processing too slow and one of the
116    // synchronizers restarts e.g. because Tycho ended the subscription.
117    Advanced(Header),
118    Ended,
119}
120
121pub struct SynchronizerStream {
122    extractor_id: ExtractorIdentity,
123    state: SynchronizerState,
124    modify_ts: NaiveDateTime,
125    rx: Receiver<StateSyncMessage>,
126}
127
128impl SynchronizerStream {
129    async fn try_advance(
130        &mut self,
131        block_history: &BlockHistory,
132        max_wait: std::time::Duration,
133        stale_threshold: std::time::Duration,
134    ) -> Option<StateSyncMessage> {
135        let extractor_id = &self.extractor_id;
136        let latest_block = block_history.latest();
137        match &self.state {
138            SynchronizerState::Started | SynchronizerState::Ended => {
139                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
140                None
141            }
142            SynchronizerState::Advanced(b) => {
143                let future_block = b.clone();
144                // Transition to ready once we arrived at the expected height
145                self.transition(future_block, block_history, stale_threshold);
146                None
147            }
148            SynchronizerState::Ready(previous_block) => {
149                // Try to recv the next expected block, update state accordingly.
150                self.try_recv_next_expected(
151                    max_wait,
152                    block_history,
153                    previous_block.clone(),
154                    stale_threshold,
155                )
156                .await
157                // TODO: if we entered advanced state we need to buffer the message for a while.
158            }
159            SynchronizerState::Delayed(old_block) => {
160                // try to catch up all currently queued blocks until the expected block
161                debug!(
162                    ?old_block,
163                    ?latest_block,
164                    %extractor_id,
165                    "Trying to catch up to latest block"
166                );
167                self.try_catch_up(block_history, max_wait, stale_threshold)
168                    .await
169            }
170            SynchronizerState::Stale(old_block) => {
171                debug!(
172                    ?old_block,
173                    ?latest_block,
174                    %extractor_id,
175                    "Trying to catch up to latest block"
176                );
177                self.try_catch_up(block_history, max_wait, stale_threshold)
178                    .await
179            }
180        }
181    }
182
183    /// Standard way to advance a well-behaved state synchronizer.
184    ///
185    /// Will wait for a new block on the synchronizer within a timeout. And modify it's state based
186    /// on the outcome.
187    async fn try_recv_next_expected(
188        &mut self,
189        max_wait: std::time::Duration,
190        block_history: &BlockHistory,
191        previous_block: Header,
192        stale_threshold: std::time::Duration,
193    ) -> Option<StateSyncMessage> {
194        let extractor_id = &self.extractor_id;
195        match timeout(max_wait, self.rx.recv()).await {
196            Ok(Some(msg)) => {
197                self.transition(msg.header.clone(), block_history, stale_threshold);
198                Some(msg)
199            }
200            Ok(None) => {
201                error!(
202                    %extractor_id,
203                    ?previous_block,
204                    "Extractor terminated: channel closed!"
205                );
206                self.state = SynchronizerState::Ended;
207                self.modify_ts = Local::now().naive_utc();
208                None
209            }
210            Err(_) => {
211                // trying to advance a block timed out
212                debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
213                self.state = SynchronizerState::Delayed(previous_block.clone());
214                None
215            }
216        }
217    }
218
219    /// Tries to catch up a delayed state synchronizer.
220    ///
221    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
222    /// by consuming all waiting messages in it's queue and waiting for any new block messages
223    /// within a timeout. Finally, all update messages are merged into one and returned.
224    async fn try_catch_up(
225        &mut self,
226        block_history: &BlockHistory,
227        max_wait: std::time::Duration,
228        stale_threshold: std::time::Duration,
229    ) -> Option<StateSyncMessage> {
230        let mut results = Vec::new();
231        let extractor_id = &self.extractor_id;
232
233        // Set a deadline for the overall catch-up operation
234        let deadline = std::time::Instant::now() + max_wait;
235
236        while std::time::Instant::now() < deadline {
237            match timeout(
238                deadline.saturating_duration_since(std::time::Instant::now()),
239                self.rx.recv(),
240            )
241            .await
242            {
243                Ok(Some(msg)) => {
244                    debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
245                    let block_pos = block_history
246                        .determine_block_position(&msg.header)
247                        .unwrap();
248                    results.push(msg);
249                    if matches!(block_pos, BlockPosition::NextExpected) {
250                        break;
251                    }
252                }
253                Ok(None) => {
254                    warn!(%extractor_id, "Channel closed during catch-up");
255                    self.state = SynchronizerState::Ended;
256                    return None;
257                }
258                Err(_) => {
259                    debug!(%extractor_id, "Timed out waiting for catch-up");
260                    break;
261                }
262            }
263        }
264
265        let merged = results
266            .into_iter()
267            .reduce(|l, r| l.merge(r));
268
269        if let Some(msg) = merged {
270            // we were able to get at least one block out
271            debug!(?extractor_id, "Delayed extractor made progress!");
272            self.transition(msg.header.clone(), block_history, stale_threshold);
273            Some(msg)
274        } else {
275            None
276        }
277    }
278
279    /// Logic to transition a state synchronizer based on newly received block
280    ///
281    /// Updates the synchronizer's state according to the position of the received block:
282    /// - Next expected block -> Ready state
283    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
284    /// - Advanced block -> Advanced state (block ahead of expected position)
285    fn transition(
286        &mut self,
287        latest_retrieved: Header,
288        block_history: &BlockHistory,
289        stale_threshold: std::time::Duration,
290    ) {
291        let extractor_id = &self.extractor_id;
292        let last_message_at = self.modify_ts;
293        let block = &latest_retrieved;
294
295        match block_history
296            .determine_block_position(&latest_retrieved)
297            .expect("Block positiion could not be determined.")
298        {
299            BlockPosition::NextExpected => {
300                self.state = SynchronizerState::Ready(latest_retrieved.clone());
301            }
302            BlockPosition::Latest | BlockPosition::Delayed => {
303                let now = Local::now().naive_utc();
304                let wait_duration = self
305                    .modify_ts
306                    .signed_duration_since(now);
307                let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
308                    .expect("Staleness threshold could not convert to chrono Duration");
309                if wait_duration > stale_threshold_chrono {
310                    warn!(
311                        ?extractor_id,
312                        ?last_message_at,
313                        ?block,
314                        "Extractor transition to stale."
315                    );
316                    self.state = SynchronizerState::Stale(latest_retrieved.clone());
317                } else {
318                    warn!(
319                        ?extractor_id,
320                        ?last_message_at,
321                        ?block,
322                        "Extractor transition to delayed."
323                    );
324                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
325                }
326            }
327            BlockPosition::Advanced => {
328                error!(
329                    ?extractor_id,
330                    ?last_message_at,
331                    ?block,
332                    "Extractor transition to advanced."
333                );
334                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
335            }
336        }
337        self.modify_ts = Local::now().naive_utc();
338    }
339}
340
341#[derive(Debug, PartialEq, Serialize, Deserialize)]
342pub struct FeedMessage {
343    pub state_msgs: HashMap<String, StateSyncMessage>,
344    pub sync_states: HashMap<String, SynchronizerState>,
345}
346
347impl FeedMessage {
348    fn new(
349        state_msgs: HashMap<String, StateSyncMessage>,
350        sync_states: HashMap<String, SynchronizerState>,
351    ) -> Self {
352        Self { state_msgs, sync_states }
353    }
354}
355
356impl<S> BlockSynchronizer<S>
357where
358    S: StateSynchronizer,
359{
360    pub fn new(
361        block_time: std::time::Duration,
362        max_wait: std::time::Duration,
363        max_missed_blocks: u64,
364    ) -> Self {
365        Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
366    }
367
368    pub fn max_messages(&mut self, val: usize) {
369        self.max_messages = Some(val);
370    }
371
372    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
373        let mut registered = self.synchronizers.unwrap_or_default();
374        registered.insert(id, synchronizer);
375        self.synchronizers = Some(registered);
376        self
377    }
378    pub async fn run(mut self) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage>)> {
379        trace!("Starting BlockSynchronizer...");
380        let mut state_sync_tasks = FuturesUnordered::new();
381        let mut synchronizers = self
382            .synchronizers
383            .take()
384            .expect("No synchronizers set!");
385        // init synchronizers
386        let init_tasks = synchronizers
387            .values()
388            .map(|s| s.initialize())
389            .collect::<Vec<_>>();
390        try_join_all(init_tasks).await?;
391
392        let mut sync_streams = HashMap::with_capacity(synchronizers.len());
393        let mut sync_handles = Vec::new();
394        for (extractor_id, synchronizer) in synchronizers.drain() {
395            let (jh, rx) = synchronizer.start().await?;
396            state_sync_tasks.push(jh);
397            sync_handles.push(synchronizer);
398
399            sync_streams.insert(
400                extractor_id.clone(),
401                SynchronizerStream {
402                    extractor_id,
403                    state: SynchronizerState::Started,
404                    modify_ts: Local::now().naive_utc(),
405                    rx,
406                },
407            );
408        }
409
410        // startup, schedule first set of futures and wait for them to return to initialise
411        // synchronizers.
412        debug!("Waiting for initial synchronizer messages...");
413        let mut startup_futures = Vec::new();
414        for (id, sh) in sync_streams.iter_mut() {
415            let fut = async {
416                let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
417                (id.clone(), res)
418            };
419            startup_futures.push(fut);
420        }
421        let mut ready_sync_msgs = HashMap::new();
422        let initial_headers = join_all(startup_futures)
423            .await
424            .into_iter()
425            .filter_map(|(extractor_id, res)| {
426                let synchronizer = sync_streams
427                .get_mut(&extractor_id)
428                .unwrap();
429            match res {
430                Ok(Some(msg)) => {
431                    debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
432                    // initially default all synchronizers to Ready
433                    synchronizer.state = SynchronizerState::Ready(msg.header.clone());
434                    synchronizer.modify_ts = Local::now().naive_utc();
435                    ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
436                    Some(msg.header)
437                }
438                Ok(None) => {
439                    warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
440                    synchronizer.state = SynchronizerState::Ended;
441                    synchronizer.modify_ts = Local::now().naive_utc();
442                    None
443                }
444                Err(_) => {
445                    warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
446                    synchronizer.state = SynchronizerState::Stale(Header::default());
447                    synchronizer.modify_ts = Local::now().naive_utc();
448                    None
449                }
450            }
451        })
452        .collect::<HashSet<_>>() // remove duplicates
453        .into_iter()
454        .collect::<Vec<_>>();
455
456        let mut block_history = BlockHistory::new(initial_headers, 15);
457
458        // Determine the starting header for synchronization
459        let start_header = block_history
460            .latest()
461            .ok_or_else(|| anyhow::anyhow!("No synchronizers were ready for the operation"))?;
462        info!(
463            start_block=?start_header,
464            n_healthy=?ready_sync_msgs.len(),
465            "Block synchronization started successfully!"
466        );
467
468        // Purge any stale synchronizers
469        // All synchronizers that did not timeout on start up are initialized as Ready, including
470        // those that are Delayed. Delayed synchronizers are identified and updated accordingly in
471        // the next step.
472        sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
473
474        // Determine correct state for each remaining synchronizer, based on their header vs the
475        // latest one
476        for (_, stream) in sync_streams.iter_mut() {
477            if let SynchronizerState::Ready(header) = &stream.state.clone() {
478                if header.number < start_header.number {
479                    stream.state = SynchronizerState::Delayed(header.clone());
480                    debug!(
481                        extractor_id=%stream.extractor_id,
482                        synchronizer_block=?header.number,
483                        current_block=?start_header.number,
484                        "Marking synchronizer as delayed during initialization"
485                    );
486                }
487            }
488        }
489
490        let (sync_tx, sync_rx) = mpsc::channel(30);
491        let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
492            let mut n_iter = 1;
493            loop {
494                // Send retrieved data to receivers.
495                sync_tx
496                    .send(FeedMessage::new(
497                        std::mem::take(&mut ready_sync_msgs),
498                        sync_streams
499                            .iter()
500                            .map(|(a, b)| (a.name.to_string(), b.state.clone()))
501                            .collect(),
502                    ))
503                    .await?;
504
505                // Check if we have reached the max messages
506                if let Some(max_messages) = self.max_messages {
507                    if n_iter >= max_messages {
508                        info!(max_messages, "StreamEnd");
509                        return Ok(());
510                    }
511                }
512                n_iter += 1;
513
514                // Here we simply wait block_time + max_wait. This will not work for chains with
515                // unknown block times but is simple enough for now.
516                // If we would like to support unknown block times we could: Instruct all handles to
517                // await the max block time, if a header arrives within that time transition as
518                // usual, but via a select statement get notified (using e.g. Notify) if any other
519                // handle finishes before the timeout. Then await again but this time only for
520                // max_wait and then proceed as usual. So basically each try_advance task would have
521                // a select statement that allows it to exit the first timeout preemptively if any
522                // other try_advance task finished earlier.
523                let mut recv_futures = Vec::new();
524                for (extractor_id, sh) in sync_streams.iter_mut() {
525                    recv_futures.push(async {
526                        let res = sh
527                            .try_advance(
528                                &block_history,
529                                self.block_time + self.max_wait,
530                                self.block_time
531                                    .mul_f64(self.max_missed_blocks as f64),
532                            )
533                            .await;
534                        res.map(|msg| (extractor_id.name.clone(), msg))
535                    });
536                }
537                ready_sync_msgs.extend(
538                    join_all(recv_futures)
539                        .await
540                        .into_iter()
541                        .flatten(),
542                );
543
544                // Purge any bad synchronizers, respective warnings have already been issued at
545                // transition time.
546                sync_streams.retain(|_, v| match v.state {
547                    SynchronizerState::Started | SynchronizerState::Ended => false,
548                    SynchronizerState::Stale(_) => false,
549                    SynchronizerState::Ready(_) => true,
550                    SynchronizerState::Delayed(_) => true,
551                    SynchronizerState::Advanced(_) => true,
552                });
553
554                block_history
555                    .push(
556                        sync_streams
557                            .values()
558                            .filter_map(|v| match &v.state {
559                                SynchronizerState::Ready(b) => Some(b),
560                                _ => None,
561                            })
562                            .next()
563                            // no synchronizers is ready
564                            .ok_or_else(|| {
565                                anyhow::format_err!("Not a single synchronizer was ready.")
566                            })?
567                            .clone(),
568                    )
569                    .map_err(|err| anyhow::format_err!("Failed processing new block: {}", err))?;
570            }
571        });
572
573        let nanny_jh = tokio::spawn(async move {
574            select! {
575                error = state_sync_tasks.select_next_some() => {
576                    for s in sync_handles.iter_mut() {
577                        s.close().await.expect("StateSynchronizer was not started!");
578                    }
579                    error!(?error, "state synchronizer exited");
580                },
581                error = main_loop_jh => {
582                    error!(?error, "feed main loop exited");
583                }
584            }
585        });
586        Ok((nanny_jh, sync_rx))
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use std::sync::Arc;
593
594    use async_trait::async_trait;
595    use test_log::test;
596    use tokio::sync::{oneshot, Mutex};
597    use tycho_common::dto::Chain;
598
599    use super::*;
600    use crate::feed::synchronizer::SyncResult;
601
602    #[derive(Clone)]
603    struct MockStateSync {
604        header_tx: mpsc::Sender<StateSyncMessage>,
605        header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage>>>>,
606        end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
607    }
608
609    impl MockStateSync {
610        fn new() -> Self {
611            let (tx, rx) = mpsc::channel(1);
612            Self {
613                header_tx: tx,
614                header_rx: Arc::new(Mutex::new(Some(rx))),
615                end_tx: Arc::new(Mutex::new(None)),
616            }
617        }
618        async fn send_header(&self, header: StateSyncMessage) {
619            self.header_tx
620                .send(header)
621                .await
622                .expect("sending header failed");
623        }
624    }
625
626    #[async_trait]
627    impl StateSynchronizer for MockStateSync {
628        async fn initialize(&self) -> SyncResult<()> {
629            Ok(())
630        }
631
632        async fn start(
633            &self,
634        ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage>)> {
635            let block_rx = {
636                let mut guard = self.header_rx.lock().await;
637                guard
638                    .take()
639                    .expect("Block receiver was not set!")
640            };
641
642            let end_rx = {
643                let (end_tx, end_rx) = oneshot::channel();
644                let mut guard = self.end_tx.lock().await;
645                *guard = Some(end_tx);
646                end_rx
647            };
648
649            let jh = tokio::spawn(async move {
650                let _ = end_rx.await;
651                SyncResult::Ok(())
652            });
653
654            Ok((jh, block_rx))
655        }
656
657        async fn close(&mut self) -> SyncResult<()> {
658            let mut guard = self.end_tx.lock().await;
659            if let Some(tx) = guard.take() {
660                tx.send(())
661                    .expect("end channel closed!");
662                Ok(())
663            } else {
664                Err(anyhow::format_err!("Not connected"))
665            }
666        }
667    }
668
669    #[test(tokio::test)]
670    async fn test_two_ready_synchronizers() {
671        let v2_sync = MockStateSync::new();
672        let v3_sync = MockStateSync::new();
673        let block_sync = BlockSynchronizer::new(
674            std::time::Duration::from_millis(500),
675            std::time::Duration::from_millis(50),
676            10,
677        )
678        .register_synchronizer(
679            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
680            v2_sync.clone(),
681        )
682        .register_synchronizer(
683            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
684            v3_sync.clone(),
685        );
686        let start_msg = StateSyncMessage {
687            header: Header { number: 1, ..Default::default() },
688            ..Default::default()
689        };
690        v2_sync
691            .send_header(start_msg.clone())
692            .await;
693        v3_sync
694            .send_header(start_msg.clone())
695            .await;
696
697        let (_jh, mut rx) = block_sync
698            .run()
699            .await
700            .expect("BlockSynchronizer failed to start.");
701        let first_feed_msg = rx
702            .recv()
703            .await
704            .expect("header channel was closed");
705        let second_msg = StateSyncMessage {
706            header: Header { number: 2, ..Default::default() },
707            ..Default::default()
708        };
709        v2_sync
710            .send_header(second_msg.clone())
711            .await;
712        v3_sync
713            .send_header(second_msg.clone())
714            .await;
715        let second_feed_msg = rx
716            .recv()
717            .await
718            .expect("header channel was closed!");
719
720        let exp1 = FeedMessage {
721            state_msgs: [
722                ("uniswap-v2".to_string(), start_msg.clone()),
723                ("uniswap-v3".to_string(), start_msg.clone()),
724            ]
725            .into_iter()
726            .collect(),
727            sync_states: [
728                ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
729                ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
730            ]
731            .into_iter()
732            .collect(),
733        };
734        let exp2 = FeedMessage {
735            state_msgs: [
736                ("uniswap-v2".to_string(), second_msg.clone()),
737                ("uniswap-v3".to_string(), second_msg.clone()),
738            ]
739            .into_iter()
740            .collect(),
741            sync_states: [
742                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
743                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
744            ]
745            .into_iter()
746            .collect(),
747        };
748        assert_eq!(first_feed_msg, exp1);
749        assert_eq!(second_feed_msg, exp2);
750    }
751
752    #[test(tokio::test)]
753    async fn test_delayed_synchronizer_catches_up() {
754        let v2_sync = MockStateSync::new();
755        let v3_sync = MockStateSync::new();
756        let block_sync = BlockSynchronizer::new(
757            std::time::Duration::from_millis(500),
758            std::time::Duration::from_millis(50),
759            10,
760        )
761        .register_synchronizer(
762            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
763            v2_sync.clone(),
764        )
765        .register_synchronizer(
766            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
767            v3_sync.clone(),
768        );
769
770        // Initial messages - both synchronizers are at block 1
771        let block1_msg = StateSyncMessage {
772            header: Header {
773                number: 1,
774                hash: Bytes::from(vec![1]),
775                parent_hash: Bytes::from(vec![0]),
776                revert: false,
777            },
778            ..Default::default()
779        };
780        v2_sync
781            .send_header(block1_msg.clone())
782            .await;
783        v3_sync
784            .send_header(block1_msg.clone())
785            .await;
786
787        // Start the block synchronizer
788        let (_jh, mut rx) = block_sync
789            .run()
790            .await
791            .expect("BlockSynchronizer failed to start.");
792
793        // Consume the first message
794        let first_feed_msg = rx
795            .recv()
796            .await
797            .expect("header channel was closed");
798        assert_eq!(first_feed_msg.state_msgs.len(), 2);
799        assert!(matches!(
800            first_feed_msg
801                .sync_states
802                .get("uniswap-v2")
803                .unwrap(),
804            SynchronizerState::Ready(_)
805        ));
806        assert!(matches!(
807            first_feed_msg
808                .sync_states
809                .get("uniswap-v3")
810                .unwrap(),
811            SynchronizerState::Ready(_)
812        ));
813
814        // Send block 2 to v2 synchronizer only
815        let block2_msg = StateSyncMessage {
816            header: Header {
817                number: 2,
818                hash: Bytes::from(vec![2]),
819                parent_hash: Bytes::from(vec![1]),
820                revert: false,
821            },
822            ..Default::default()
823        };
824        v2_sync
825            .send_header(block2_msg.clone())
826            .await;
827
828        // Consume second message - v3 should be delayed
829        let second_feed_msg = rx
830            .recv()
831            .await
832            .expect("header channel was closed");
833        assert!(second_feed_msg
834            .state_msgs
835            .contains_key("uniswap-v2"));
836        assert!(matches!(
837            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
838            SynchronizerState::Ready(header) if header.number == 2
839        ));
840        assert!(!second_feed_msg
841            .state_msgs
842            .contains_key("uniswap-v3"));
843        assert!(matches!(
844            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
845            SynchronizerState::Delayed(header) if header.number == 1
846        ));
847
848        // Now v3 catches up to block 2
849        v3_sync
850            .send_header(block2_msg.clone())
851            .await;
852
853        // Both advance to block 3
854        let block3_msg = StateSyncMessage {
855            header: Header {
856                number: 3,
857                hash: Bytes::from(vec![3]),
858                parent_hash: Bytes::from(vec![2]),
859                revert: false,
860            },
861            ..Default::default()
862        };
863        v2_sync
864            .send_header(block3_msg.clone())
865            .await;
866        v3_sync.send_header(block3_msg).await;
867
868        // Consume third message - both should be on block 3
869        let third_feed_msg = rx
870            .recv()
871            .await
872            .expect("header channel was closed");
873        assert!(third_feed_msg
874            .state_msgs
875            .contains_key("uniswap-v2"));
876        assert!(third_feed_msg
877            .state_msgs
878            .contains_key("uniswap-v3"));
879        assert!(matches!(
880            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
881            SynchronizerState::Ready(header) if header.number == 3
882        ));
883        assert!(matches!(
884            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
885            SynchronizerState::Ready(header) if header.number == 3
886        ));
887    }
888
889    #[test(tokio::test)]
890    async fn test_different_start_blocks() {
891        let v2_sync = MockStateSync::new();
892        let v3_sync = MockStateSync::new();
893        let block_sync = BlockSynchronizer::new(
894            std::time::Duration::from_millis(500),
895            std::time::Duration::from_millis(50),
896            10,
897        )
898        .register_synchronizer(
899            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
900            v2_sync.clone(),
901        )
902        .register_synchronizer(
903            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
904            v3_sync.clone(),
905        );
906
907        // Initial messages - synchronizers at different blocks
908        let block1_msg = StateSyncMessage {
909            header: Header {
910                number: 1,
911                hash: Bytes::from(vec![1]),
912                parent_hash: Bytes::from(vec![0]),
913                revert: false,
914            },
915            ..Default::default()
916        };
917        let block2_msg = StateSyncMessage {
918            header: Header {
919                number: 2,
920                hash: Bytes::from(vec![2]),
921                parent_hash: Bytes::from(vec![1]),
922                revert: false,
923            },
924            ..Default::default()
925        };
926
927        v2_sync
928            .send_header(block1_msg.clone())
929            .await;
930        v3_sync
931            .send_header(block2_msg.clone())
932            .await;
933
934        // Start the block synchronizer - it should use block 2 as the starting block
935        let (_jh, mut rx) = block_sync
936            .run()
937            .await
938            .expect("BlockSynchronizer failed to start.");
939
940        // Consume first message
941        let first_feed_msg = rx
942            .recv()
943            .await
944            .expect("header channel was closed");
945        assert!(matches!(
946            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
947            SynchronizerState::Delayed(header) if header.number == 1
948        ));
949        assert!(matches!(
950            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
951            SynchronizerState::Ready(header) if header.number == 2
952        ));
953
954        // Now v2 catches up to block 2
955        v2_sync
956            .send_header(block2_msg.clone())
957            .await;
958
959        // Both advance to block 3
960        let block3_msg = StateSyncMessage {
961            header: Header {
962                number: 3,
963                hash: Bytes::from(vec![3]),
964                parent_hash: Bytes::from(vec![2]),
965                revert: false,
966            },
967            ..Default::default()
968        };
969        v2_sync
970            .send_header(block3_msg.clone())
971            .await;
972        v3_sync
973            .send_header(block3_msg.clone())
974            .await;
975
976        // Consume third message - both should be on block 3
977        let second_feed_msg = rx
978            .recv()
979            .await
980            .expect("header channel was closed");
981        assert_eq!(second_feed_msg.state_msgs.len(), 2);
982        assert!(matches!(
983            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
984            SynchronizerState::Ready(header) if header.number == 3
985        ));
986        assert!(matches!(
987            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
988            SynchronizerState::Ready(header) if header.number == 3
989        ));
990    }
991}