tycho_client/feed/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5    future::{join_all, try_join_all},
6    stream::FuturesUnordered,
7    StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12    select,
13    sync::mpsc::{self, Receiver},
14    task::JoinHandle,
15    time::timeout,
16};
17use tracing::{debug, error, info, trace, warn};
18use tycho_common::{
19    dto::{Block, ExtractorIdentity},
20    Bytes,
21};
22
23use crate::feed::{
24    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
25    synchronizer::{StateSyncMessage, StateSynchronizer, SynchronizerError},
26};
27
28mod block_history;
29pub mod component_tracker;
30pub mod synchronizer;
31
32#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
33pub struct Header {
34    pub hash: Bytes,
35    pub number: u64,
36    pub parent_hash: Bytes,
37    pub revert: bool,
38}
39
40impl Header {
41    fn from_block(block: &Block, revert: bool) -> Self {
42        Self {
43            hash: block.hash.clone(),
44            number: block.number,
45            parent_hash: block.parent_hash.clone(),
46            revert,
47        }
48    }
49}
50
51#[derive(Error, Debug)]
52pub enum BlockSynchronizerError {
53    #[error("Failed to initialize synchronizer: {0}")]
54    InitializationError(#[from] SynchronizerError),
55
56    #[error("Failed to process new block: {0}")]
57    BlockHistoryError(#[from] BlockHistoryError),
58
59    #[error("Not a single synchronizer was ready")]
60    NoReadySynchronizers,
61
62    #[error("No synchronizers were set")]
63    NoSynchronizers,
64
65    #[error("Failed to convert duration: {0}")]
66    DurationConversionError(String),
67}
68
69type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
70
71/// Aligns multiple StateSynchronizers on the block dimension.
72///
73/// ## Purpose
74/// The purpose of this component is to handle streams from multiple state synchronizers and
75/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
76/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
77/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
78/// included again once we reach the block it is at.
79///
80/// ## Limitations
81/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
82///
83/// ## Initialisation
84/// Queries all registered synchronizers for their first message and evaluates the state of each
85/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
86/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
87/// closed.
88///
89/// ## Main loop
90/// Once started, the synchronizers are queried concurrently for messages in lock step:
91/// the main loop queries all synchronizers in ready for the last emitted data, builds the
92/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
93///
94/// ## Synchronization Logic
95///
96/// To classify a synchronizer as delayed, we need to first define the current block. The highest
97/// block number of all ready synchronizers is considered the current block.
98///
99/// Once we have the current block we can easily determine which block we expect next. And if a
100/// synchronizer delivers an older block we can classify it as delayed.
101///
102/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
103/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
104/// the height of an advanced synchronizer (and flagging it as such in the meantime).
105///
106/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
107/// within the block production step of the blockchain:
108/// The wait procedure consists of waiting for any of the receivers to emit a new message (within a
109/// max timeout - several multiples of the block time). Once a message is received a very short
110/// timeout start for the remaining synchronizers, to deliver a message. Any synchronizer failing to
111/// do so is transitioned to delayed.
112///
113/// ### Note
114/// The described process above is the goal. It is currently not implemented like that. Instead we
115/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
116/// timeout. This is simpler but only works well on chains with fixed block times.
117pub struct BlockSynchronizer<S> {
118    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
119    block_time: std::time::Duration,
120    max_wait: std::time::Duration,
121    max_messages: Option<usize>,
122    max_missed_blocks: u64,
123}
124
125#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
126#[serde(tag = "status", rename_all = "lowercase")]
127pub enum SynchronizerState {
128    Started,
129    Ready(Header),
130    // no progress, we consider it stale at that point we should purge it.
131    Stale(Header),
132    Delayed(Header),
133    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
134    // StateSynchronizer. This can only happen if we are processing too slow and one of the
135    // synchronizers restarts e.g. because Tycho ended the subscription.
136    Advanced(Header),
137    Ended,
138}
139
140pub struct SynchronizerStream {
141    extractor_id: ExtractorIdentity,
142    state: SynchronizerState,
143    modify_ts: NaiveDateTime,
144    rx: Receiver<StateSyncMessage>,
145}
146
147impl SynchronizerStream {
148    async fn try_advance(
149        &mut self,
150        block_history: &BlockHistory,
151        max_wait: std::time::Duration,
152        stale_threshold: std::time::Duration,
153    ) -> BlockSyncResult<Option<StateSyncMessage>> {
154        let extractor_id = self.extractor_id.clone();
155        let latest_block = block_history.latest();
156        match &self.state {
157            SynchronizerState::Started | SynchronizerState::Ended => {
158                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
159                Ok(None)
160            }
161            SynchronizerState::Advanced(b) => {
162                let future_block = b.clone();
163                // Transition to ready once we arrived at the expected height
164                self.transition(future_block, block_history, stale_threshold)?;
165                Ok(None)
166            }
167            SynchronizerState::Ready(previous_block) => {
168                // Try to recv the next expected block, update state accordingly.
169                self.try_recv_next_expected(
170                    max_wait,
171                    block_history,
172                    previous_block.clone(),
173                    stale_threshold,
174                )
175                .await
176                // TODO: if we entered advanced state we need to buffer the message for a while.
177            }
178            SynchronizerState::Delayed(old_block) => {
179                // try to catch up all currently queued blocks until the expected block
180                debug!(
181                    ?old_block,
182                    ?latest_block,
183                    %extractor_id,
184                    "Trying to catch up to latest block"
185                );
186                self.try_catch_up(block_history, max_wait, stale_threshold)
187                    .await
188            }
189            SynchronizerState::Stale(old_block) => {
190                debug!(
191                    ?old_block,
192                    ?latest_block,
193                    %extractor_id,
194                    "Trying to catch up to latest block"
195                );
196                self.try_catch_up(block_history, max_wait, stale_threshold)
197                    .await
198            }
199        }
200    }
201
202    /// Standard way to advance a well-behaved state synchronizer.
203    ///
204    /// Will wait for a new block on the synchronizer within a timeout. And modify its state based
205    /// on the outcome.
206    async fn try_recv_next_expected(
207        &mut self,
208        max_wait: std::time::Duration,
209        block_history: &BlockHistory,
210        previous_block: Header,
211        stale_threshold: std::time::Duration,
212    ) -> BlockSyncResult<Option<StateSyncMessage>> {
213        let extractor_id = self.extractor_id.clone();
214        match timeout(max_wait, self.rx.recv()).await {
215            Ok(Some(msg)) => {
216                self.transition(msg.header.clone(), block_history, stale_threshold)?;
217                Ok(Some(msg))
218            }
219            Ok(None) => {
220                error!(
221                    %extractor_id,
222                    ?previous_block,
223                    "Extractor terminated: channel closed!"
224                );
225                self.state = SynchronizerState::Ended;
226                self.modify_ts = Local::now().naive_utc();
227                Ok(None)
228            }
229            Err(_) => {
230                // trying to advance a block timed out
231                debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
232                self.state = SynchronizerState::Delayed(previous_block.clone());
233                Ok(None)
234            }
235        }
236    }
237
238    /// Tries to catch up a delayed state synchronizer.
239    ///
240    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
241    /// by consuming all waiting messages in its queue and waiting for any new block messages
242    /// within a timeout. Finally, all update messages are merged into one and returned.
243    async fn try_catch_up(
244        &mut self,
245        block_history: &BlockHistory,
246        max_wait: std::time::Duration,
247        stale_threshold: std::time::Duration,
248    ) -> BlockSyncResult<Option<StateSyncMessage>> {
249        let mut results = Vec::new();
250        let extractor_id = self.extractor_id.clone();
251
252        // Set a deadline for the overall catch-up operation
253        let deadline = std::time::Instant::now() + max_wait;
254
255        while std::time::Instant::now() < deadline {
256            match timeout(
257                deadline.saturating_duration_since(std::time::Instant::now()),
258                self.rx.recv(),
259            )
260            .await
261            {
262                Ok(Some(msg)) => {
263                    debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
264                    let block_pos = block_history.determine_block_position(&msg.header)?;
265                    results.push(msg);
266                    if matches!(block_pos, BlockPosition::NextExpected) {
267                        break;
268                    }
269                }
270                Ok(None) => {
271                    warn!(%extractor_id, "Channel closed during catch-up");
272                    self.state = SynchronizerState::Ended;
273                    return Ok(None);
274                }
275                Err(_) => {
276                    debug!(%extractor_id, "Timed out waiting for catch-up");
277                    break;
278                }
279            }
280        }
281
282        let merged = results
283            .into_iter()
284            .reduce(|l, r| l.merge(r));
285
286        if let Some(msg) = merged {
287            // we were able to get at least one block out
288            debug!(?extractor_id, "Delayed extractor made progress!");
289            self.transition(msg.header.clone(), block_history, stale_threshold)?;
290            Ok(Some(msg))
291        } else {
292            Ok(None)
293        }
294    }
295
296    /// Logic to transition a state synchronizer based on newly received block
297    ///
298    /// Updates the synchronizer's state according to the position of the received block:
299    /// - Next expected block -> Ready state
300    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
301    /// - Advanced block -> Advanced state (block ahead of expected position)
302    fn transition(
303        &mut self,
304        latest_retrieved: Header,
305        block_history: &BlockHistory,
306        stale_threshold: std::time::Duration,
307    ) -> Result<(), BlockSynchronizerError> {
308        let extractor_id = self.extractor_id.clone();
309        let last_message_at = self.modify_ts;
310        let block = &latest_retrieved;
311
312        match block_history.determine_block_position(&latest_retrieved)? {
313            BlockPosition::NextExpected => {
314                self.state = SynchronizerState::Ready(latest_retrieved.clone());
315            }
316            BlockPosition::Latest | BlockPosition::Delayed => {
317                let now = Local::now().naive_utc();
318                let wait_duration = self
319                    .modify_ts
320                    .signed_duration_since(now);
321                let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
322                    .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
323                if wait_duration > stale_threshold_chrono {
324                    warn!(
325                        ?extractor_id,
326                        ?last_message_at,
327                        ?block,
328                        "Extractor transition to stale."
329                    );
330                    self.state = SynchronizerState::Stale(latest_retrieved.clone());
331                } else {
332                    warn!(
333                        ?extractor_id,
334                        ?last_message_at,
335                        ?block,
336                        "Extractor transition to delayed."
337                    );
338                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
339                }
340            }
341            BlockPosition::Advanced => {
342                error!(
343                    ?extractor_id,
344                    ?last_message_at,
345                    ?block,
346                    "Extractor transition to advanced."
347                );
348                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
349            }
350        }
351        self.modify_ts = Local::now().naive_utc();
352        Ok(())
353    }
354}
355
356#[derive(Debug, PartialEq, Serialize, Deserialize)]
357pub struct FeedMessage {
358    pub state_msgs: HashMap<String, StateSyncMessage>,
359    pub sync_states: HashMap<String, SynchronizerState>,
360}
361
362impl FeedMessage {
363    fn new(
364        state_msgs: HashMap<String, StateSyncMessage>,
365        sync_states: HashMap<String, SynchronizerState>,
366    ) -> Self {
367        Self { state_msgs, sync_states }
368    }
369}
370
371impl<S> BlockSynchronizer<S>
372where
373    S: StateSynchronizer,
374{
375    pub fn new(
376        block_time: std::time::Duration,
377        max_wait: std::time::Duration,
378        max_missed_blocks: u64,
379    ) -> Self {
380        Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
381    }
382
383    pub fn max_messages(&mut self, val: usize) {
384        self.max_messages = Some(val);
385    }
386
387    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
388        let mut registered = self.synchronizers.unwrap_or_default();
389        registered.insert(id, synchronizer);
390        self.synchronizers = Some(registered);
391        self
392    }
393    pub async fn run(mut self) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage>)> {
394        trace!("Starting BlockSynchronizer...");
395        let mut state_sync_tasks = FuturesUnordered::new();
396        let mut synchronizers = self
397            .synchronizers
398            .take()
399            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
400        // init synchronizers
401        let init_tasks = synchronizers
402            .values()
403            .map(|s| s.initialize())
404            .collect::<Vec<_>>();
405        try_join_all(init_tasks).await?;
406
407        let mut sync_streams = HashMap::with_capacity(synchronizers.len());
408        let mut sync_handles = Vec::new();
409        for (extractor_id, synchronizer) in synchronizers.drain() {
410            let (jh, rx) = synchronizer.start().await?;
411            state_sync_tasks.push(jh);
412            sync_handles.push(synchronizer);
413
414            sync_streams.insert(
415                extractor_id.clone(),
416                SynchronizerStream {
417                    extractor_id,
418                    state: SynchronizerState::Started,
419                    modify_ts: Local::now().naive_utc(),
420                    rx,
421                },
422            );
423        }
424
425        // startup, schedule first set of futures and wait for them to return to initialise
426        // synchronizers.
427        debug!("Waiting for initial synchronizer messages...");
428        let mut startup_futures = Vec::new();
429        for (id, sh) in sync_streams.iter_mut() {
430            let fut = async {
431                let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
432                (id.clone(), res)
433            };
434            startup_futures.push(fut);
435        }
436        let mut ready_sync_msgs = HashMap::new();
437        let initial_headers = join_all(startup_futures)
438            .await
439            .into_iter()
440            .filter_map(|(extractor_id, res)| {
441                let synchronizer = sync_streams
442                .get_mut(&extractor_id)
443                .unwrap();
444            match res {
445                Ok(Some(msg)) => {
446                    debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
447                    // initially default all synchronizers to Ready
448                    synchronizer.state = SynchronizerState::Ready(msg.header.clone());
449                    synchronizer.modify_ts = Local::now().naive_utc();
450                    ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
451                    Some(msg.header)
452                }
453                Ok(None) => {
454                    warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
455                    synchronizer.state = SynchronizerState::Ended;
456                    synchronizer.modify_ts = Local::now().naive_utc();
457                    None
458                }
459                Err(_) => {
460                    warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
461                    synchronizer.state = SynchronizerState::Stale(Header::default());
462                    synchronizer.modify_ts = Local::now().naive_utc();
463                    None
464                }
465            }
466        })
467        .collect::<HashSet<_>>() // remove duplicates
468        .into_iter()
469        .collect::<Vec<_>>();
470
471        let mut block_history = BlockHistory::new(initial_headers, 15)?;
472
473        // Determine the starting header for synchronization
474        let start_header = block_history
475            .latest()
476            .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
477        info!(
478            start_block=?start_header,
479            n_healthy=?ready_sync_msgs.len(),
480            "Block synchronization started successfully!"
481        );
482
483        // Purge any stale synchronizers
484        // All synchronizers that did not timeout on start up are initialized as Ready, including
485        // those that are Delayed. Delayed synchronizers are identified and updated accordingly in
486        // the next step.
487        sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
488
489        // Determine correct state for each remaining synchronizer, based on their header vs the
490        // latest one
491        for (_, stream) in sync_streams.iter_mut() {
492            if let SynchronizerState::Ready(header) = &stream.state.clone() {
493                if header.number < start_header.number {
494                    stream.state = SynchronizerState::Delayed(header.clone());
495                    debug!(
496                        extractor_id=%stream.extractor_id,
497                        synchronizer_block=?header.number,
498                        current_block=?start_header.number,
499                        "Marking synchronizer as delayed during initialization"
500                    );
501                }
502            }
503        }
504
505        let (sync_tx, sync_rx) = mpsc::channel(30);
506        let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
507            let mut n_iter = 1;
508            loop {
509                // Send retrieved data to receivers.
510                sync_tx
511                    .send(FeedMessage::new(
512                        std::mem::take(&mut ready_sync_msgs),
513                        sync_streams
514                            .iter()
515                            .map(|(a, b)| (a.name.to_string(), b.state.clone()))
516                            .collect(),
517                    ))
518                    .await?;
519
520                // Check if we have reached the max messages
521                if let Some(max_messages) = self.max_messages {
522                    if n_iter >= max_messages {
523                        info!(max_messages, "StreamEnd");
524                        return Ok(());
525                    }
526                }
527                n_iter += 1;
528
529                // Here we simply wait block_time + max_wait. This will not work for chains with
530                // unknown block times but is simple enough for now.
531                // If we would like to support unknown block times we could: Instruct all handles to
532                // await the max block time, if a header arrives within that time transition as
533                // usual, but via a select statement get notified (using e.g. Notify) if any other
534                // handle finishes before the timeout. Then await again but this time only for
535                // max_wait and then proceed as usual. So basically each try_advance task would have
536                // a select statement that allows it to exit the first timeout preemptively if any
537                // other try_advance task finished earlier.
538                let mut recv_futures = Vec::new();
539                for (extractor_id, sh) in sync_streams.iter_mut() {
540                    recv_futures.push(async {
541                        let res = sh
542                            .try_advance(
543                                &block_history,
544                                self.block_time + self.max_wait,
545                                self.block_time
546                                    .mul_f64(self.max_missed_blocks as f64),
547                            )
548                            .await?;
549                        Ok::<_, BlockSynchronizerError>(
550                            res.map(|msg| (extractor_id.name.clone(), msg)),
551                        )
552                    });
553                }
554                ready_sync_msgs.extend(
555                    join_all(recv_futures)
556                        .await
557                        .into_iter()
558                        .collect::<Result<Vec<_>, _>>()?
559                        .into_iter()
560                        .flatten(),
561                );
562
563                // Purge any bad synchronizers, respective warnings have already been issued at
564                // transition time.
565                sync_streams.retain(|_, v| match v.state {
566                    SynchronizerState::Started | SynchronizerState::Ended => false,
567                    SynchronizerState::Stale(_) => false,
568                    SynchronizerState::Ready(_) => true,
569                    SynchronizerState::Delayed(_) => true,
570                    SynchronizerState::Advanced(_) => true,
571                });
572
573                block_history.push(
574                    sync_streams
575                        .values()
576                        .filter_map(|v| match &v.state {
577                            SynchronizerState::Ready(b) => Some(b),
578                            _ => None,
579                        })
580                        .next()
581                        // no synchronizers are ready
582                        .ok_or(BlockSynchronizerError::NoReadySynchronizers)?
583                        .clone(),
584                )?;
585            }
586        });
587
588        let nanny_jh = tokio::spawn(async move {
589            select! {
590                error = state_sync_tasks.select_next_some() => {
591                    for s in sync_handles.iter_mut() {
592                        if let Err(e) = s.close().await {
593                            error!(error=?e, "Failed to close synchronizer: was not started!");
594                        }
595                    }
596                    error!(?error, "State synchronizer exited");
597                },
598                error = main_loop_jh => {
599                    error!(?error, "Feed main loop exited");
600                }
601            }
602        });
603        Ok((nanny_jh, sync_rx))
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use std::sync::Arc;
610
611    use async_trait::async_trait;
612    use test_log::test;
613    use tokio::sync::{oneshot, Mutex};
614    use tycho_common::dto::Chain;
615
616    use super::{synchronizer::SynchronizerError, *};
617    use crate::feed::synchronizer::SyncResult;
618
619    #[derive(Clone)]
620    struct MockStateSync {
621        header_tx: mpsc::Sender<StateSyncMessage>,
622        header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage>>>>,
623        end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
624    }
625
626    impl MockStateSync {
627        fn new() -> Self {
628            let (tx, rx) = mpsc::channel(1);
629            Self {
630                header_tx: tx,
631                header_rx: Arc::new(Mutex::new(Some(rx))),
632                end_tx: Arc::new(Mutex::new(None)),
633            }
634        }
635        async fn send_header(&self, header: StateSyncMessage) {
636            self.header_tx
637                .send(header)
638                .await
639                .expect("sending header failed");
640        }
641    }
642
643    #[async_trait]
644    impl StateSynchronizer for MockStateSync {
645        async fn initialize(&self) -> SyncResult<()> {
646            Ok(())
647        }
648
649        async fn start(
650            &self,
651        ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage>)> {
652            let block_rx = {
653                let mut guard = self.header_rx.lock().await;
654                guard
655                    .take()
656                    .expect("Block receiver was not set!")
657            };
658
659            let end_rx = {
660                let (end_tx, end_rx) = oneshot::channel();
661                let mut guard = self.end_tx.lock().await;
662                *guard = Some(end_tx);
663                end_rx
664            };
665
666            let jh = tokio::spawn(async move {
667                let _ = end_rx.await;
668                SyncResult::Ok(())
669            });
670
671            Ok((jh, block_rx))
672        }
673
674        async fn close(&mut self) -> SyncResult<()> {
675            let mut guard = self.end_tx.lock().await;
676            if let Some(tx) = guard.take() {
677                tx.send(())
678                    .expect("end channel closed!");
679                Ok(())
680            } else {
681                Err(SynchronizerError::CloseError("Not Started".to_string()))
682            }
683        }
684    }
685
686    #[test(tokio::test)]
687    async fn test_two_ready_synchronizers() {
688        let v2_sync = MockStateSync::new();
689        let v3_sync = MockStateSync::new();
690        let block_sync = BlockSynchronizer::new(
691            std::time::Duration::from_millis(500),
692            std::time::Duration::from_millis(50),
693            10,
694        )
695        .register_synchronizer(
696            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
697            v2_sync.clone(),
698        )
699        .register_synchronizer(
700            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
701            v3_sync.clone(),
702        );
703        let start_msg = StateSyncMessage {
704            header: Header { number: 1, ..Default::default() },
705            ..Default::default()
706        };
707        v2_sync
708            .send_header(start_msg.clone())
709            .await;
710        v3_sync
711            .send_header(start_msg.clone())
712            .await;
713
714        let (_jh, mut rx) = block_sync
715            .run()
716            .await
717            .expect("BlockSynchronizer failed to start.");
718        let first_feed_msg = rx
719            .recv()
720            .await
721            .expect("header channel was closed");
722        let second_msg = StateSyncMessage {
723            header: Header { number: 2, ..Default::default() },
724            ..Default::default()
725        };
726        v2_sync
727            .send_header(second_msg.clone())
728            .await;
729        v3_sync
730            .send_header(second_msg.clone())
731            .await;
732        let second_feed_msg = rx
733            .recv()
734            .await
735            .expect("header channel was closed!");
736
737        let exp1 = FeedMessage {
738            state_msgs: [
739                ("uniswap-v2".to_string(), start_msg.clone()),
740                ("uniswap-v3".to_string(), start_msg.clone()),
741            ]
742            .into_iter()
743            .collect(),
744            sync_states: [
745                ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
746                ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
747            ]
748            .into_iter()
749            .collect(),
750        };
751        let exp2 = FeedMessage {
752            state_msgs: [
753                ("uniswap-v2".to_string(), second_msg.clone()),
754                ("uniswap-v3".to_string(), second_msg.clone()),
755            ]
756            .into_iter()
757            .collect(),
758            sync_states: [
759                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
760                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
761            ]
762            .into_iter()
763            .collect(),
764        };
765        assert_eq!(first_feed_msg, exp1);
766        assert_eq!(second_feed_msg, exp2);
767    }
768
769    #[test(tokio::test)]
770    async fn test_delayed_synchronizer_catches_up() {
771        let v2_sync = MockStateSync::new();
772        let v3_sync = MockStateSync::new();
773        let block_sync = BlockSynchronizer::new(
774            std::time::Duration::from_millis(500),
775            std::time::Duration::from_millis(50),
776            10,
777        )
778        .register_synchronizer(
779            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
780            v2_sync.clone(),
781        )
782        .register_synchronizer(
783            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
784            v3_sync.clone(),
785        );
786
787        // Initial messages - both synchronizers are at block 1
788        let block1_msg = StateSyncMessage {
789            header: Header {
790                number: 1,
791                hash: Bytes::from(vec![1]),
792                parent_hash: Bytes::from(vec![0]),
793                revert: false,
794            },
795            ..Default::default()
796        };
797        v2_sync
798            .send_header(block1_msg.clone())
799            .await;
800        v3_sync
801            .send_header(block1_msg.clone())
802            .await;
803
804        // Start the block synchronizer
805        let (_jh, mut rx) = block_sync
806            .run()
807            .await
808            .expect("BlockSynchronizer failed to start.");
809
810        // Consume the first message
811        let first_feed_msg = rx
812            .recv()
813            .await
814            .expect("header channel was closed");
815        assert_eq!(first_feed_msg.state_msgs.len(), 2);
816        assert!(matches!(
817            first_feed_msg
818                .sync_states
819                .get("uniswap-v2")
820                .unwrap(),
821            SynchronizerState::Ready(_)
822        ));
823        assert!(matches!(
824            first_feed_msg
825                .sync_states
826                .get("uniswap-v3")
827                .unwrap(),
828            SynchronizerState::Ready(_)
829        ));
830
831        // Send block 2 to v2 synchronizer only
832        let block2_msg = StateSyncMessage {
833            header: Header {
834                number: 2,
835                hash: Bytes::from(vec![2]),
836                parent_hash: Bytes::from(vec![1]),
837                revert: false,
838            },
839            ..Default::default()
840        };
841        v2_sync
842            .send_header(block2_msg.clone())
843            .await;
844
845        // Consume second message - v3 should be delayed
846        let second_feed_msg = rx
847            .recv()
848            .await
849            .expect("header channel was closed");
850        assert!(second_feed_msg
851            .state_msgs
852            .contains_key("uniswap-v2"));
853        assert!(matches!(
854            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
855            SynchronizerState::Ready(header) if header.number == 2
856        ));
857        assert!(!second_feed_msg
858            .state_msgs
859            .contains_key("uniswap-v3"));
860        assert!(matches!(
861            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
862            SynchronizerState::Delayed(header) if header.number == 1
863        ));
864
865        // Now v3 catches up to block 2
866        v3_sync
867            .send_header(block2_msg.clone())
868            .await;
869
870        // Both advance to block 3
871        let block3_msg = StateSyncMessage {
872            header: Header {
873                number: 3,
874                hash: Bytes::from(vec![3]),
875                parent_hash: Bytes::from(vec![2]),
876                revert: false,
877            },
878            ..Default::default()
879        };
880        v2_sync
881            .send_header(block3_msg.clone())
882            .await;
883        v3_sync.send_header(block3_msg).await;
884
885        // Consume third message - both should be on block 3
886        let third_feed_msg = rx
887            .recv()
888            .await
889            .expect("header channel was closed");
890        assert!(third_feed_msg
891            .state_msgs
892            .contains_key("uniswap-v2"));
893        assert!(third_feed_msg
894            .state_msgs
895            .contains_key("uniswap-v3"));
896        assert!(matches!(
897            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
898            SynchronizerState::Ready(header) if header.number == 3
899        ));
900        assert!(matches!(
901            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
902            SynchronizerState::Ready(header) if header.number == 3
903        ));
904    }
905
906    #[test(tokio::test)]
907    async fn test_different_start_blocks() {
908        let v2_sync = MockStateSync::new();
909        let v3_sync = MockStateSync::new();
910        let block_sync = BlockSynchronizer::new(
911            std::time::Duration::from_millis(500),
912            std::time::Duration::from_millis(50),
913            10,
914        )
915        .register_synchronizer(
916            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
917            v2_sync.clone(),
918        )
919        .register_synchronizer(
920            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
921            v3_sync.clone(),
922        );
923
924        // Initial messages - synchronizers at different blocks
925        let block1_msg = StateSyncMessage {
926            header: Header {
927                number: 1,
928                hash: Bytes::from(vec![1]),
929                parent_hash: Bytes::from(vec![0]),
930                revert: false,
931            },
932            ..Default::default()
933        };
934        let block2_msg = StateSyncMessage {
935            header: Header {
936                number: 2,
937                hash: Bytes::from(vec![2]),
938                parent_hash: Bytes::from(vec![1]),
939                revert: false,
940            },
941            ..Default::default()
942        };
943
944        v2_sync
945            .send_header(block1_msg.clone())
946            .await;
947        v3_sync
948            .send_header(block2_msg.clone())
949            .await;
950
951        // Start the block synchronizer - it should use block 2 as the starting block
952        let (_jh, mut rx) = block_sync
953            .run()
954            .await
955            .expect("BlockSynchronizer failed to start.");
956
957        // Consume first message
958        let first_feed_msg = rx
959            .recv()
960            .await
961            .expect("header channel was closed");
962        assert!(matches!(
963            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
964            SynchronizerState::Delayed(header) if header.number == 1
965        ));
966        assert!(matches!(
967            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
968            SynchronizerState::Ready(header) if header.number == 2
969        ));
970
971        // Now v2 catches up to block 2
972        v2_sync
973            .send_header(block2_msg.clone())
974            .await;
975
976        // Both advance to block 3
977        let block3_msg = StateSyncMessage {
978            header: Header {
979                number: 3,
980                hash: Bytes::from(vec![3]),
981                parent_hash: Bytes::from(vec![2]),
982                revert: false,
983            },
984            ..Default::default()
985        };
986        v2_sync
987            .send_header(block3_msg.clone())
988            .await;
989        v3_sync
990            .send_header(block3_msg.clone())
991            .await;
992
993        // Consume third message - both should be on block 3
994        let second_feed_msg = rx
995            .recv()
996            .await
997            .expect("header channel was closed");
998        assert_eq!(second_feed_msg.state_msgs.len(), 2);
999        assert!(matches!(
1000            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1001            SynchronizerState::Ready(header) if header.number == 3
1002        ));
1003        assert!(matches!(
1004            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1005            SynchronizerState::Ready(header) if header.number == 3
1006        ));
1007    }
1008}