tycho_client/feed/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
4use futures03::{
5    future::{join_all, try_join_all},
6    stream::FuturesUnordered,
7    StreamExt,
8};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12    select,
13    sync::mpsc::{self, Receiver},
14    task::JoinHandle,
15    time::timeout,
16};
17use tracing::{debug, error, info, trace, warn};
18use tycho_common::{
19    dto::{Block, ExtractorIdentity},
20    Bytes,
21};
22
23use crate::feed::{
24    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
25    synchronizer::{StateSyncMessage, StateSynchronizer, SynchronizerError},
26};
27
28mod block_history;
29pub mod component_tracker;
30pub mod synchronizer;
31
32/// A trait representing a minimal interface for types that behave like a block header.
33///
34/// This abstraction allows working with either full block headers (`BlockHeader`)
35/// or simplified structures that only provide a timestamp (e.g., for RFQ logic).
36pub trait HeaderLike {
37    fn block(self) -> Option<BlockHeader>;
38    fn block_number_or_timestamp(self) -> u64;
39}
40
41#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
42pub struct BlockHeader {
43    pub hash: Bytes,
44    pub number: u64,
45    pub parent_hash: Bytes,
46    pub revert: bool,
47    pub timestamp: u64,
48}
49
50impl BlockHeader {
51    fn from_block(block: &Block, revert: bool) -> Self {
52        Self {
53            hash: block.hash.clone(),
54            number: block.number,
55            parent_hash: block.parent_hash.clone(),
56            revert,
57            timestamp: block.ts.timestamp() as u64,
58        }
59    }
60}
61
62impl HeaderLike for BlockHeader {
63    fn block(self) -> Option<BlockHeader> {
64        Some(self)
65    }
66
67    fn block_number_or_timestamp(self) -> u64 {
68        self.number
69    }
70}
71
72#[derive(Error, Debug)]
73pub enum BlockSynchronizerError {
74    #[error("Failed to initialize synchronizer: {0}")]
75    InitializationError(#[from] SynchronizerError),
76
77    #[error("Failed to process new block: {0}")]
78    BlockHistoryError(#[from] BlockHistoryError),
79
80    #[error("Not a single synchronizer was ready")]
81    NoReadySynchronizers,
82
83    #[error("No synchronizers were set")]
84    NoSynchronizers,
85
86    #[error("Failed to convert duration: {0}")]
87    DurationConversionError(String),
88}
89
90type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
91
92/// Aligns multiple StateSynchronizers on the block dimension.
93///
94/// ## Purpose
95/// The purpose of this component is to handle streams from multiple state synchronizers and
96/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
97/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
98/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
99/// included again once we reach the block it is at.
100///
101/// ## Limitations
102/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
103///
104/// ## Initialisation
105/// Queries all registered synchronizers for their first message and evaluates the state of each
106/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
107/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
108/// closed.
109///
110/// ## Main loop
111/// Once started, the synchronizers are queried concurrently for messages in lock step:
112/// the main loop queries all synchronizers in ready for the last emitted data, builds the
113/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
114///
115/// ## Synchronization Logic
116///
117/// To classify a synchronizer as delayed, we need to first define the current block. The highest
118/// block number of all ready synchronizers is considered the current block.
119///
120/// Once we have the current block we can easily determine which block we expect next. And if a
121/// synchronizer delivers an older block we can classify it as delayed.
122///
123/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
124/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
125/// the height of an advanced synchronizer (and flagging it as such in the meantime).
126///
127/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
128/// within the block production step of the blockchain:
129/// The wait procedure consists of waiting for any of the receivers to emit a new message (within a
130/// max timeout - several multiples of the block time). Once a message is received a very short
131/// timeout start for the remaining synchronizers, to deliver a message. Any synchronizer failing to
132/// do so is transitioned to delayed.
133///
134/// ### Note
135/// The described process above is the goal. It is currently not implemented like that. Instead we
136/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
137/// timeout. This is simpler but only works well on chains with fixed block times.
138pub struct BlockSynchronizer<S> {
139    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
140    block_time: std::time::Duration,
141    max_wait: std::time::Duration,
142    max_messages: Option<usize>,
143    max_missed_blocks: u64,
144}
145
146#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
147#[serde(tag = "status", rename_all = "lowercase")]
148pub enum SynchronizerState {
149    Started,
150    Ready(BlockHeader),
151    // no progress, we consider it stale at that point we should purge it.
152    Stale(BlockHeader),
153    Delayed(BlockHeader),
154    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
155    // StateSynchronizer. This can only happen if we are processing too slow and one of the
156    // synchronizers restarts e.g. because Tycho ended the subscription.
157    Advanced(BlockHeader),
158    Ended,
159}
160
161pub struct SynchronizerStream {
162    extractor_id: ExtractorIdentity,
163    state: SynchronizerState,
164    modify_ts: NaiveDateTime,
165    rx: Receiver<StateSyncMessage<BlockHeader>>,
166}
167
168impl SynchronizerStream {
169    async fn try_advance(
170        &mut self,
171        block_history: &BlockHistory,
172        max_wait: std::time::Duration,
173        stale_threshold: std::time::Duration,
174    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
175        let extractor_id = self.extractor_id.clone();
176        let latest_block = block_history.latest();
177        match &self.state {
178            SynchronizerState::Started | SynchronizerState::Ended => {
179                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
180                Ok(None)
181            }
182            SynchronizerState::Advanced(b) => {
183                let future_block = b.clone();
184                // Transition to ready once we arrived at the expected height
185                self.transition(future_block, block_history, stale_threshold)?;
186                Ok(None)
187            }
188            SynchronizerState::Ready(previous_block) => {
189                // Try to recv the next expected block, update state accordingly.
190                self.try_recv_next_expected(
191                    max_wait,
192                    block_history,
193                    previous_block.clone(),
194                    stale_threshold,
195                )
196                .await
197                // TODO: if we entered advanced state we need to buffer the message for a while.
198            }
199            SynchronizerState::Delayed(old_block) => {
200                // try to catch up all currently queued blocks until the expected block
201                debug!(
202                    ?old_block,
203                    ?latest_block,
204                    %extractor_id,
205                    "Trying to catch up to latest block"
206                );
207                self.try_catch_up(block_history, max_wait, stale_threshold)
208                    .await
209            }
210            SynchronizerState::Stale(old_block) => {
211                debug!(
212                    ?old_block,
213                    ?latest_block,
214                    %extractor_id,
215                    "Trying to catch up to latest block"
216                );
217                self.try_catch_up(block_history, max_wait, stale_threshold)
218                    .await
219            }
220        }
221    }
222
223    /// Standard way to advance a well-behaved state synchronizer.
224    ///
225    /// Will wait for a new block on the synchronizer within a timeout. And modify its state based
226    /// on the outcome.
227    async fn try_recv_next_expected(
228        &mut self,
229        max_wait: std::time::Duration,
230        block_history: &BlockHistory,
231        previous_block: BlockHeader,
232        stale_threshold: std::time::Duration,
233    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
234        let extractor_id = self.extractor_id.clone();
235        match timeout(max_wait, self.rx.recv()).await {
236            Ok(Some(msg)) => {
237                self.transition(msg.header.clone(), block_history, stale_threshold)?;
238                Ok(Some(msg))
239            }
240            Ok(None) => {
241                error!(
242                    %extractor_id,
243                    ?previous_block,
244                    "Extractor terminated: channel closed!"
245                );
246                self.state = SynchronizerState::Ended;
247                self.modify_ts = Local::now().naive_utc();
248                Ok(None)
249            }
250            Err(_) => {
251                // trying to advance a block timed out
252                debug!(%extractor_id, ?previous_block, "Extractor did not check in within time.");
253                self.state = SynchronizerState::Delayed(previous_block.clone());
254                Ok(None)
255            }
256        }
257    }
258
259    /// Tries to catch up a delayed state synchronizer.
260    ///
261    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
262    /// by consuming all waiting messages in its queue and waiting for any new block messages
263    /// within a timeout. Finally, all update messages are merged into one and returned.
264    async fn try_catch_up(
265        &mut self,
266        block_history: &BlockHistory,
267        max_wait: std::time::Duration,
268        stale_threshold: std::time::Duration,
269    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
270        let mut results = Vec::new();
271        let extractor_id = self.extractor_id.clone();
272
273        // Set a deadline for the overall catch-up operation
274        let deadline = std::time::Instant::now() + max_wait;
275
276        while std::time::Instant::now() < deadline {
277            match timeout(
278                deadline.saturating_duration_since(std::time::Instant::now()),
279                self.rx.recv(),
280            )
281            .await
282            {
283                Ok(Some(msg)) => {
284                    debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
285                    let block_pos = block_history.determine_block_position(&msg.header)?;
286                    results.push(msg);
287                    if matches!(block_pos, BlockPosition::NextExpected) {
288                        break;
289                    }
290                }
291                Ok(None) => {
292                    warn!(%extractor_id, "Channel closed during catch-up");
293                    self.state = SynchronizerState::Ended;
294                    return Ok(None);
295                }
296                Err(_) => {
297                    debug!(%extractor_id, "Timed out waiting for catch-up");
298                    break;
299                }
300            }
301        }
302
303        let merged = results
304            .into_iter()
305            .reduce(|l, r| l.merge(r));
306
307        if let Some(msg) = merged {
308            // we were able to get at least one block out
309            debug!(?extractor_id, "Delayed extractor made progress!");
310            self.transition(msg.header.clone(), block_history, stale_threshold)?;
311            Ok(Some(msg))
312        } else {
313            Ok(None)
314        }
315    }
316
317    /// Logic to transition a state synchronizer based on newly received block
318    ///
319    /// Updates the synchronizer's state according to the position of the received block:
320    /// - Next expected block -> Ready state
321    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
322    /// - Advanced block -> Advanced state (block ahead of expected position)
323    fn transition(
324        &mut self,
325        latest_retrieved: BlockHeader,
326        block_history: &BlockHistory,
327        stale_threshold: std::time::Duration,
328    ) -> Result<(), BlockSynchronizerError> {
329        let extractor_id = self.extractor_id.clone();
330        let last_message_at = self.modify_ts;
331        let block = &latest_retrieved;
332
333        match block_history.determine_block_position(&latest_retrieved)? {
334            BlockPosition::NextExpected => {
335                self.state = SynchronizerState::Ready(latest_retrieved.clone());
336            }
337            BlockPosition::Latest | BlockPosition::Delayed => {
338                let now = Local::now().naive_utc();
339                let wait_duration = self
340                    .modify_ts
341                    .signed_duration_since(now);
342                let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
343                    .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
344                if wait_duration > stale_threshold_chrono {
345                    warn!(
346                        ?extractor_id,
347                        ?last_message_at,
348                        ?block,
349                        "Extractor transition to stale."
350                    );
351                    self.state = SynchronizerState::Stale(latest_retrieved.clone());
352                } else {
353                    warn!(
354                        ?extractor_id,
355                        ?last_message_at,
356                        ?block,
357                        "Extractor transition to delayed."
358                    );
359                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
360                }
361            }
362            BlockPosition::Advanced => {
363                error!(
364                    ?extractor_id,
365                    ?last_message_at,
366                    ?block,
367                    "Extractor transition to advanced."
368                );
369                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
370            }
371        }
372        self.modify_ts = Local::now().naive_utc();
373        Ok(())
374    }
375}
376
377#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
378pub struct FeedMessage<H>
379where
380    H: HeaderLike,
381{
382    pub state_msgs: HashMap<String, StateSyncMessage<H>>,
383    pub sync_states: HashMap<String, SynchronizerState>,
384}
385
386impl<H> FeedMessage<H>
387where
388    H: HeaderLike,
389{
390    fn new(
391        state_msgs: HashMap<String, StateSyncMessage<H>>,
392        sync_states: HashMap<String, SynchronizerState>,
393    ) -> Self {
394        Self { state_msgs, sync_states }
395    }
396}
397
398impl<S> BlockSynchronizer<S>
399where
400    S: StateSynchronizer,
401{
402    pub fn new(
403        block_time: std::time::Duration,
404        max_wait: std::time::Duration,
405        max_missed_blocks: u64,
406    ) -> Self {
407        Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
408    }
409
410    pub fn max_messages(&mut self, val: usize) {
411        self.max_messages = Some(val);
412    }
413
414    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
415        let mut registered = self.synchronizers.unwrap_or_default();
416        registered.insert(id, synchronizer);
417        self.synchronizers = Some(registered);
418        self
419    }
420    pub async fn run(
421        mut self,
422    ) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>)> {
423        trace!("Starting BlockSynchronizer...");
424        let mut state_sync_tasks = FuturesUnordered::new();
425        let mut synchronizers = self
426            .synchronizers
427            .take()
428            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
429        // init synchronizers
430        let init_tasks = synchronizers
431            .values()
432            .map(|s| s.initialize())
433            .collect::<Vec<_>>();
434        try_join_all(init_tasks).await?;
435
436        let mut sync_streams = HashMap::with_capacity(synchronizers.len());
437        let mut sync_handles = Vec::new();
438        for (extractor_id, synchronizer) in synchronizers.drain() {
439            let (jh, rx) = synchronizer.start().await?;
440            state_sync_tasks.push(jh);
441            sync_handles.push(synchronizer);
442
443            sync_streams.insert(
444                extractor_id.clone(),
445                SynchronizerStream {
446                    extractor_id,
447                    state: SynchronizerState::Started,
448                    modify_ts: Local::now().naive_utc(),
449                    rx,
450                },
451            );
452        }
453
454        // startup, schedule first set of futures and wait for them to return to initialise
455        // synchronizers.
456        debug!("Waiting for initial synchronizer messages...");
457        let mut startup_futures = Vec::new();
458        for (id, sh) in sync_streams.iter_mut() {
459            let fut = async {
460                let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
461                (id.clone(), res)
462            };
463            startup_futures.push(fut);
464        }
465        let mut ready_sync_msgs = HashMap::new();
466        let initial_headers = join_all(startup_futures)
467            .await
468            .into_iter()
469            .filter_map(|(extractor_id, res)| {
470                let synchronizer = sync_streams
471                .get_mut(&extractor_id)
472                .unwrap();
473            match res {
474                Ok(Some(msg)) => {
475                    debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
476                    // initially default all synchronizers to Ready
477                    synchronizer.state = SynchronizerState::Ready(msg.header.clone());
478                    synchronizer.modify_ts = Local::now().naive_utc();
479                    ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
480                    Some(msg.header)
481                }
482                Ok(None) => {
483                    warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
484                    synchronizer.state = SynchronizerState::Ended;
485                    synchronizer.modify_ts = Local::now().naive_utc();
486                    None
487                }
488                Err(_) => {
489                    warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
490                    synchronizer.state = SynchronizerState::Stale(BlockHeader::default());
491                    synchronizer.modify_ts = Local::now().naive_utc();
492                    None
493                }
494            }
495        })
496        .collect::<HashSet<_>>() // remove duplicates
497        .into_iter()
498        .collect::<Vec<_>>();
499
500        let mut block_history = BlockHistory::new(initial_headers, 15)?;
501
502        // Determine the starting header for synchronization
503        let start_header = block_history
504            .latest()
505            .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
506        info!(
507            start_block=?start_header,
508            n_healthy=?ready_sync_msgs.len(),
509            "Block synchronization started successfully!"
510        );
511
512        // Purge any stale synchronizers
513        // All synchronizers that did not timeout on start up are initialized as Ready, including
514        // those that are Delayed. Delayed synchronizers are identified and updated accordingly in
515        // the next step.
516        sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
517
518        // Determine correct state for each remaining synchronizer, based on their header vs the
519        // latest one
520        for (_, stream) in sync_streams.iter_mut() {
521            if let SynchronizerState::Ready(header) = &stream.state.clone() {
522                if header.number < start_header.number {
523                    stream.state = SynchronizerState::Delayed(header.clone());
524                    debug!(
525                        extractor_id=%stream.extractor_id,
526                        synchronizer_block=?header.number,
527                        current_block=?start_header.number,
528                        "Marking synchronizer as delayed during initialization"
529                    );
530                }
531            }
532        }
533
534        let (sync_tx, sync_rx) = mpsc::channel(30);
535        let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
536            let mut n_iter = 1;
537            loop {
538                // Send retrieved data to receivers.
539                sync_tx
540                    .send(FeedMessage::new(
541                        std::mem::take(&mut ready_sync_msgs),
542                        sync_streams
543                            .iter()
544                            .map(|(a, b)| (a.name.to_string(), b.state.clone()))
545                            .collect(),
546                    ))
547                    .await?;
548
549                // Check if we have reached the max messages
550                if let Some(max_messages) = self.max_messages {
551                    if n_iter >= max_messages {
552                        info!(max_messages, "StreamEnd");
553                        return Ok(());
554                    }
555                }
556                n_iter += 1;
557
558                // Here we simply wait block_time + max_wait. This will not work for chains with
559                // unknown block times but is simple enough for now.
560                // If we would like to support unknown block times we could: Instruct all handles to
561                // await the max block time, if a header arrives within that time transition as
562                // usual, but via a select statement get notified (using e.g. Notify) if any other
563                // handle finishes before the timeout. Then await again but this time only for
564                // max_wait and then proceed as usual. So basically each try_advance task would have
565                // a select statement that allows it to exit the first timeout preemptively if any
566                // other try_advance task finished earlier.
567                let mut recv_futures = Vec::new();
568                for (extractor_id, sh) in sync_streams.iter_mut() {
569                    recv_futures.push(async {
570                        let res = sh
571                            .try_advance(
572                                &block_history,
573                                self.block_time + self.max_wait,
574                                self.block_time
575                                    .mul_f64(self.max_missed_blocks as f64),
576                            )
577                            .await?;
578                        Ok::<_, BlockSynchronizerError>(
579                            res.map(|msg| (extractor_id.name.clone(), msg)),
580                        )
581                    });
582                }
583                ready_sync_msgs.extend(
584                    join_all(recv_futures)
585                        .await
586                        .into_iter()
587                        .collect::<Result<Vec<_>, _>>()?
588                        .into_iter()
589                        .flatten(),
590                );
591
592                // Purge any bad synchronizers, respective warnings have already been issued at
593                // transition time.
594                sync_streams.retain(|_, v| match v.state {
595                    SynchronizerState::Started | SynchronizerState::Ended => false,
596                    SynchronizerState::Stale(_) => false,
597                    SynchronizerState::Ready(_) => true,
598                    SynchronizerState::Delayed(_) => true,
599                    SynchronizerState::Advanced(_) => true,
600                });
601
602                block_history.push(
603                    sync_streams
604                        .values()
605                        .filter_map(|v| match &v.state {
606                            SynchronizerState::Ready(b) => Some(b),
607                            _ => None,
608                        })
609                        .next()
610                        // no synchronizers are ready
611                        .ok_or(BlockSynchronizerError::NoReadySynchronizers)?
612                        .clone(),
613                )?;
614            }
615        });
616
617        let nanny_jh = tokio::spawn(async move {
618            select! {
619                error = state_sync_tasks.select_next_some() => {
620                    for s in sync_handles.iter_mut() {
621                        if let Err(e) = s.close().await {
622                            error!(error=?e, "Failed to close synchronizer: was not started!");
623                        }
624                    }
625                    error!(?error, "State synchronizer exited");
626                },
627                error = main_loop_jh => {
628                    error!(?error, "Feed main loop exited");
629                }
630            }
631        });
632        Ok((nanny_jh, sync_rx))
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use std::sync::Arc;
639
640    use async_trait::async_trait;
641    use test_log::test;
642    use tokio::sync::{oneshot, Mutex};
643    use tycho_common::dto::Chain;
644
645    use super::{synchronizer::SynchronizerError, *};
646    use crate::feed::synchronizer::SyncResult;
647
648    #[derive(Clone)]
649    struct MockStateSync {
650        header_tx: mpsc::Sender<StateSyncMessage<BlockHeader>>,
651        header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage<BlockHeader>>>>>,
652        end_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
653    }
654
655    impl MockStateSync {
656        fn new() -> Self {
657            let (tx, rx) = mpsc::channel(1);
658            Self {
659                header_tx: tx,
660                header_rx: Arc::new(Mutex::new(Some(rx))),
661                end_tx: Arc::new(Mutex::new(None)),
662            }
663        }
664        async fn send_header(&self, header: StateSyncMessage<BlockHeader>) {
665            self.header_tx
666                .send(header)
667                .await
668                .expect("sending header failed");
669        }
670    }
671
672    #[async_trait]
673    impl StateSynchronizer for MockStateSync {
674        async fn initialize(&self) -> SyncResult<()> {
675            Ok(())
676        }
677
678        async fn start(
679            &self,
680        ) -> SyncResult<(JoinHandle<SyncResult<()>>, Receiver<StateSyncMessage<BlockHeader>>)>
681        {
682            let block_rx = {
683                let mut guard = self.header_rx.lock().await;
684                guard
685                    .take()
686                    .expect("Block receiver was not set!")
687            };
688
689            let end_rx = {
690                let (end_tx, end_rx) = oneshot::channel();
691                let mut guard = self.end_tx.lock().await;
692                *guard = Some(end_tx);
693                end_rx
694            };
695
696            let jh = tokio::spawn(async move {
697                let _ = end_rx.await;
698                SyncResult::Ok(())
699            });
700
701            Ok((jh, block_rx))
702        }
703
704        async fn close(&mut self) -> SyncResult<()> {
705            let mut guard = self.end_tx.lock().await;
706            if let Some(tx) = guard.take() {
707                tx.send(())
708                    .expect("end channel closed!");
709                Ok(())
710            } else {
711                Err(SynchronizerError::CloseError("Not Started".to_string()))
712            }
713        }
714    }
715
716    #[test(tokio::test)]
717    async fn test_two_ready_synchronizers() {
718        let v2_sync = MockStateSync::new();
719        let v3_sync = MockStateSync::new();
720        let block_sync = BlockSynchronizer::new(
721            std::time::Duration::from_millis(500),
722            std::time::Duration::from_millis(50),
723            10,
724        )
725        .register_synchronizer(
726            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
727            v2_sync.clone(),
728        )
729        .register_synchronizer(
730            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
731            v3_sync.clone(),
732        );
733        let start_msg = StateSyncMessage {
734            header: BlockHeader { number: 1, ..Default::default() },
735            ..Default::default()
736        };
737        v2_sync
738            .send_header(start_msg.clone())
739            .await;
740        v3_sync
741            .send_header(start_msg.clone())
742            .await;
743
744        let (_jh, mut rx) = block_sync
745            .run()
746            .await
747            .expect("BlockSynchronizer failed to start.");
748        let first_feed_msg = rx
749            .recv()
750            .await
751            .expect("header channel was closed");
752        let second_msg = StateSyncMessage {
753            header: BlockHeader { number: 2, ..Default::default() },
754            ..Default::default()
755        };
756        v2_sync
757            .send_header(second_msg.clone())
758            .await;
759        v3_sync
760            .send_header(second_msg.clone())
761            .await;
762        let second_feed_msg = rx
763            .recv()
764            .await
765            .expect("header channel was closed!");
766
767        let exp1 = FeedMessage {
768            state_msgs: [
769                ("uniswap-v2".to_string(), start_msg.clone()),
770                ("uniswap-v3".to_string(), start_msg.clone()),
771            ]
772            .into_iter()
773            .collect(),
774            sync_states: [
775                ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
776                ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
777            ]
778            .into_iter()
779            .collect(),
780        };
781        let exp2 = FeedMessage {
782            state_msgs: [
783                ("uniswap-v2".to_string(), second_msg.clone()),
784                ("uniswap-v3".to_string(), second_msg.clone()),
785            ]
786            .into_iter()
787            .collect(),
788            sync_states: [
789                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
790                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
791            ]
792            .into_iter()
793            .collect(),
794        };
795        assert_eq!(first_feed_msg, exp1);
796        assert_eq!(second_feed_msg, exp2);
797    }
798
799    #[test(tokio::test)]
800    async fn test_delayed_synchronizer_catches_up() {
801        let v2_sync = MockStateSync::new();
802        let v3_sync = MockStateSync::new();
803        let block_sync = BlockSynchronizer::new(
804            std::time::Duration::from_millis(500),
805            std::time::Duration::from_millis(50),
806            10,
807        )
808        .register_synchronizer(
809            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
810            v2_sync.clone(),
811        )
812        .register_synchronizer(
813            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
814            v3_sync.clone(),
815        );
816
817        // Initial messages - both synchronizers are at block 1
818        let block1_msg = StateSyncMessage {
819            header: BlockHeader {
820                number: 1,
821                hash: Bytes::from(vec![1]),
822                parent_hash: Bytes::from(vec![0]),
823                revert: false,
824                ..Default::default()
825            },
826            ..Default::default()
827        };
828        v2_sync
829            .send_header(block1_msg.clone())
830            .await;
831        v3_sync
832            .send_header(block1_msg.clone())
833            .await;
834
835        // Start the block synchronizer
836        let (_jh, mut rx) = block_sync
837            .run()
838            .await
839            .expect("BlockSynchronizer failed to start.");
840
841        // Consume the first message
842        let first_feed_msg = rx
843            .recv()
844            .await
845            .expect("header channel was closed");
846        assert_eq!(first_feed_msg.state_msgs.len(), 2);
847        assert!(matches!(
848            first_feed_msg
849                .sync_states
850                .get("uniswap-v2")
851                .unwrap(),
852            SynchronizerState::Ready(_)
853        ));
854        assert!(matches!(
855            first_feed_msg
856                .sync_states
857                .get("uniswap-v3")
858                .unwrap(),
859            SynchronizerState::Ready(_)
860        ));
861
862        // Send block 2 to v2 synchronizer only
863        let block2_msg = StateSyncMessage {
864            header: BlockHeader {
865                number: 2,
866                hash: Bytes::from(vec![2]),
867                parent_hash: Bytes::from(vec![1]),
868                revert: false,
869                ..Default::default()
870            },
871            ..Default::default()
872        };
873        v2_sync
874            .send_header(block2_msg.clone())
875            .await;
876
877        // Consume second message - v3 should be delayed
878        let second_feed_msg = rx
879            .recv()
880            .await
881            .expect("header channel was closed");
882        assert!(second_feed_msg
883            .state_msgs
884            .contains_key("uniswap-v2"));
885        assert!(matches!(
886            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
887            SynchronizerState::Ready(header) if header.number == 2
888        ));
889        assert!(!second_feed_msg
890            .state_msgs
891            .contains_key("uniswap-v3"));
892        assert!(matches!(
893            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
894            SynchronizerState::Delayed(header) if header.number == 1
895        ));
896
897        // Now v3 catches up to block 2
898        v3_sync
899            .send_header(block2_msg.clone())
900            .await;
901
902        // Both advance to block 3
903        let block3_msg = StateSyncMessage {
904            header: BlockHeader {
905                number: 3,
906                hash: Bytes::from(vec![3]),
907                parent_hash: Bytes::from(vec![2]),
908                revert: false,
909                ..Default::default()
910            },
911            ..Default::default()
912        };
913        v2_sync
914            .send_header(block3_msg.clone())
915            .await;
916        v3_sync.send_header(block3_msg).await;
917
918        // Consume third message - both should be on block 3
919        let third_feed_msg = rx
920            .recv()
921            .await
922            .expect("header channel was closed");
923        assert!(third_feed_msg
924            .state_msgs
925            .contains_key("uniswap-v2"));
926        assert!(third_feed_msg
927            .state_msgs
928            .contains_key("uniswap-v3"));
929        assert!(matches!(
930            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
931            SynchronizerState::Ready(header) if header.number == 3
932        ));
933        assert!(matches!(
934            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
935            SynchronizerState::Ready(header) if header.number == 3
936        ));
937    }
938
939    #[test(tokio::test)]
940    async fn test_different_start_blocks() {
941        let v2_sync = MockStateSync::new();
942        let v3_sync = MockStateSync::new();
943        let block_sync = BlockSynchronizer::new(
944            std::time::Duration::from_millis(500),
945            std::time::Duration::from_millis(50),
946            10,
947        )
948        .register_synchronizer(
949            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
950            v2_sync.clone(),
951        )
952        .register_synchronizer(
953            ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
954            v3_sync.clone(),
955        );
956
957        // Initial messages - synchronizers at different blocks
958        let block1_msg = StateSyncMessage {
959            header: BlockHeader {
960                number: 1,
961                hash: Bytes::from(vec![1]),
962                parent_hash: Bytes::from(vec![0]),
963                revert: false,
964                ..Default::default()
965            },
966            ..Default::default()
967        };
968        let block2_msg = StateSyncMessage {
969            header: BlockHeader {
970                number: 2,
971                hash: Bytes::from(vec![2]),
972                parent_hash: Bytes::from(vec![1]),
973                revert: false,
974                ..Default::default()
975            },
976            ..Default::default()
977        };
978
979        v2_sync
980            .send_header(block1_msg.clone())
981            .await;
982        v3_sync
983            .send_header(block2_msg.clone())
984            .await;
985
986        // Start the block synchronizer - it should use block 2 as the starting block
987        let (_jh, mut rx) = block_sync
988            .run()
989            .await
990            .expect("BlockSynchronizer failed to start.");
991
992        // Consume first message
993        let first_feed_msg = rx
994            .recv()
995            .await
996            .expect("header channel was closed");
997        assert!(matches!(
998            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
999            SynchronizerState::Delayed(header) if header.number == 1
1000        ));
1001        assert!(matches!(
1002            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1003            SynchronizerState::Ready(header) if header.number == 2
1004        ));
1005
1006        // Now v2 catches up to block 2
1007        v2_sync
1008            .send_header(block2_msg.clone())
1009            .await;
1010
1011        // Both advance to block 3
1012        let block3_msg = StateSyncMessage {
1013            header: BlockHeader {
1014                number: 3,
1015                hash: Bytes::from(vec![3]),
1016                parent_hash: Bytes::from(vec![2]),
1017                revert: false,
1018                ..Default::default()
1019            },
1020            ..Default::default()
1021        };
1022        v2_sync
1023            .send_header(block3_msg.clone())
1024            .await;
1025        v3_sync
1026            .send_header(block3_msg.clone())
1027            .await;
1028
1029        // Consume third message - both should be on block 3
1030        let second_feed_msg = rx
1031            .recv()
1032            .await
1033            .expect("header channel was closed");
1034        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1035        assert!(matches!(
1036            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1037            SynchronizerState::Ready(header) if header.number == 3
1038        ));
1039        assert!(matches!(
1040            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1041            SynchronizerState::Ready(header) if header.number == 3
1042        ));
1043    }
1044}