tycho_client/feed/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::{Display, Formatter},
4    time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{
9    future::{join_all, try_join_all},
10    stream::FuturesUnordered,
11    StreamExt,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::{
16    sync::{
17        mpsc::{self, Receiver},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25    display::opt,
26    dto::{Block, ExtractorIdentity},
27    Bytes,
28};
29
30use crate::feed::{
31    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
32    synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
33};
34
35mod block_history;
36pub mod component_tracker;
37pub mod synchronizer;
38
39/// A trait representing a minimal interface for types that behave like a block header.
40///
41/// This abstraction allows working with either full block headers (`BlockHeader`)
42/// or simplified structures that only provide a timestamp (e.g., for RFQ logic).
43pub trait HeaderLike {
44    fn block(self) -> Option<BlockHeader>;
45    fn block_number_or_timestamp(self) -> u64;
46}
47
48#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
49pub struct BlockHeader {
50    pub hash: Bytes,
51    pub number: u64,
52    pub parent_hash: Bytes,
53    pub revert: bool,
54    pub timestamp: u64,
55}
56
57impl Display for BlockHeader {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        // Take first 6 hex chars of the hash for readability
60        let short_hash = if self.hash.len() >= 4 {
61            hex::encode(&self.hash[..4]) // 4 bytes → 8 hex chars
62        } else {
63            hex::encode(&self.hash)
64        };
65
66        write!(f, "Block #{} [0x{}..]", self.number, short_hash)
67    }
68}
69
70impl BlockHeader {
71    fn from_block(block: &Block, revert: bool) -> Self {
72        Self {
73            hash: block.hash.clone(),
74            number: block.number,
75            parent_hash: block.parent_hash.clone(),
76            revert,
77            timestamp: block.ts.and_utc().timestamp() as u64,
78        }
79    }
80}
81
82impl HeaderLike for BlockHeader {
83    fn block(self) -> Option<BlockHeader> {
84        Some(self)
85    }
86
87    fn block_number_or_timestamp(self) -> u64 {
88        self.number
89    }
90}
91
92#[derive(Error, Debug)]
93pub enum BlockSynchronizerError {
94    #[error("Failed to initialize synchronizer: {0}")]
95    InitializationError(#[from] SynchronizerError),
96
97    #[error("Failed to process new block: {0}")]
98    BlockHistoryError(#[from] BlockHistoryError),
99
100    #[error("Not a single synchronizer was ready: {0}")]
101    NoReadySynchronizers(String),
102
103    #[error("No synchronizers were set")]
104    NoSynchronizers,
105
106    #[error("Failed to convert duration: {0}")]
107    DurationConversionError(String),
108}
109
110type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
111
112/// Aligns multiple StateSynchronizers on the block dimension.
113///
114/// ## Purpose
115/// The purpose of this component is to handle streams from multiple state synchronizers and
116/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
117/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
118/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
119/// included again once we reach the block it is at.
120///
121/// ## Limitations
122/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
123///
124/// ## Initialisation
125/// Queries all registered synchronizers for their first message and evaluates the state of each
126/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
127// TODO: what is the startup timeout
128/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
129/// closed.
130///
131/// ## Main loop
132/// Once started, the synchronizers are queried concurrently for messages in lock step:
133/// the main loop queries all synchronizers in ready for the last emitted data, builds the
134/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
135///
136/// ## Synchronization Logic
137///
138/// To classify a synchronizer as delayed, we need to first define the current block. The highest
139/// block number of all ready synchronizers is considered the current block.
140///
141/// Once we have the current block we can easily determine which block we expect next. And if a
142/// synchronizer delivers an older block we can classify it as delayed.
143///
144/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
145/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
146/// the height of an advanced synchronizer (and flagging it as such in the meantime).
147///
148/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
149/// within the block production step of the blockchain:
150/// The wait procedure consists of waiting for any of the individual ProtocolStateSynchronizers
151/// to emit a new message (within a max timeout - several multiples of the block time). Once a
152/// message is received a very short timeout starts for the remaining synchronizers, to deliver a
153/// message. Any synchronizer failing to do so is transitioned to delayed.
154///
155/// ### Note
156/// The described process above is the goal. It is currently not implemented like that. Instead we
157/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
158/// timeout. This is simpler but only works well on chains with fixed block times.
159pub struct BlockSynchronizer<S> {
160    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
161    /// Time to wait for a block usually
162    block_time: std::time::Duration,
163    /// Added on top of block time to account for latency
164    latency_buffer: std::time::Duration,
165    /// Time to wait for the full first message, including snapshot retrieval
166    startup_timeout: std::time::Duration,
167    /// Optionally, end the stream after emitting max messages
168    max_messages: Option<usize>,
169    /// Amount of blocks a protocol can be delayed for, before it is considered stale
170    max_missed_blocks: u64,
171}
172
173#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
174#[serde(tag = "status", rename_all = "lowercase")]
175pub enum SynchronizerState {
176    /// Initial state, assigned before trying to receive any message
177    Started,
178    /// The synchronizer emitted a message for the block as expected
179    Ready(BlockHeader),
180    /// The synchronizer is on a previous block compared to others and is expected to
181    /// catch up soon.
182    Delayed(BlockHeader),
183    /// The synchronizer hasn't emitted messages for > `max_missed_blocks` or has
184    /// fallen far behind. At this point we do not wait for it anymore but it can
185    /// still eventually recover.
186    Stale(BlockHeader),
187    /// The synchronizer is on future not connected block.
188    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
189    // StateSynchronizer. This can only happen if we are processing too slow and one or all of the
190    // synchronizers restarts e.g. due to websocket connection drops.
191    Advanced(BlockHeader),
192    /// The synchronizer ended with an error.
193    Ended(String),
194}
195
196impl Display for SynchronizerState {
197    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
198        match self {
199            SynchronizerState::Started => write!(f, "Started"),
200            SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
201            SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
202            SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
203            SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
204            SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
205        }
206    }
207}
208
209pub struct SynchronizerStream {
210    extractor_id: ExtractorIdentity,
211    state: SynchronizerState,
212    error: Option<SynchronizerError>,
213    modify_ts: NaiveDateTime,
214    rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
215}
216
217impl SynchronizerStream {
218    fn new(
219        extractor_id: &ExtractorIdentity,
220        rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
221    ) -> Self {
222        Self {
223            extractor_id: extractor_id.clone(),
224            state: SynchronizerState::Started,
225            error: None,
226            modify_ts: Local::now().naive_utc(),
227            rx,
228        }
229    }
230    async fn try_advance(
231        &mut self,
232        block_history: &BlockHistory,
233        block_time: std::time::Duration,
234        latency_buffer: std::time::Duration,
235        stale_threshold: std::time::Duration,
236    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
237        let extractor_id = self.extractor_id.clone();
238        let latest_block = block_history.latest();
239
240        match &self.state {
241            SynchronizerState::Started | SynchronizerState::Ended(_) => {
242                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
243                Ok(None)
244            }
245            SynchronizerState::Advanced(b) => {
246                let future_block = b.clone();
247                // Transition to ready once we arrived at the expected height
248                self.transition(future_block, block_history, stale_threshold)?;
249                Ok(None)
250            }
251            SynchronizerState::Ready(previous_block) => {
252                // Try to recv the next expected block, update state accordingly.
253                self.try_recv_next_expected(
254                    block_time + latency_buffer,
255                    block_history,
256                    previous_block.clone(),
257                    stale_threshold,
258                )
259                .await
260            }
261            SynchronizerState::Delayed(old_block) => {
262                // try to catch up all currently queued blocks until the expected block
263                debug!(
264                    %old_block,
265                    latest_block=opt(&latest_block),
266                    %extractor_id,
267                    "Trying to catch up to latest block"
268                );
269                self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
270                    .await
271            }
272            SynchronizerState::Stale(old_block) => {
273                // try to catch up all currently queued blocks until the expected block
274                debug!(
275                    %old_block,
276                    latest_block=opt(&latest_block),
277                    %extractor_id,
278                    "Trying to catch up stale synchronizer to latest block"
279                );
280                self.try_catch_up(block_history, block_time, stale_threshold)
281                    .await
282            }
283        }
284    }
285
286    /// Standard way to advance a well-behaved state synchronizer.
287    ///
288    /// Will wait for a new block on the synchronizer within a timeout. And modify its
289    /// state based on the outcome.
290    ///
291    /// ## Note
292    /// This method assumes that the current state is `Ready`.
293    async fn try_recv_next_expected(
294        &mut self,
295        max_wait: std::time::Duration,
296        block_history: &BlockHistory,
297        previous_block: BlockHeader,
298        stale_threshold: std::time::Duration,
299    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
300        let extractor_id = self.extractor_id.clone();
301        match timeout(max_wait, self.rx.recv()).await {
302            Ok(Some(Ok(msg))) => {
303                self.transition(msg.header.clone(), block_history, stale_threshold)?;
304                Ok(Some(msg))
305            }
306            Ok(Some(Err(e))) => {
307                // The underlying synchronizer exhausted its retries
308                self.mark_errored(e);
309                Ok(None)
310            }
311            Ok(None) => {
312                // This case should not happen, as we shouldn't poll the synchronizer after we
313                // closed it or after it errored.
314                warn!(
315                    %extractor_id,
316                    "Tried to poll from closed synchronizer.",
317                );
318                self.mark_closed();
319                Ok(None)
320            }
321            Err(_) => {
322                // trying to advance a block timed out
323                debug!(%extractor_id, %previous_block, "No block received within time limit.");
324                // No need to consider state since we only call this method if we are
325                // in the Ready state.
326                self.state = SynchronizerState::Delayed(previous_block.clone());
327                self.modify_ts = Local::now().naive_utc();
328                Ok(None)
329            }
330        }
331    }
332
333    /// Tries to catch up a delayed state synchronizer.
334    ///
335    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
336    /// by consuming all waiting messages in its queue and waiting for any new block messages
337    /// within a timeout. Finally, all update messages are merged into one and returned.
338    async fn try_catch_up(
339        &mut self,
340        block_history: &BlockHistory,
341        max_wait: std::time::Duration,
342        stale_threshold: std::time::Duration,
343    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
344        let mut results = Vec::new();
345        let extractor_id = self.extractor_id.clone();
346
347        // Set a deadline for the overall catch-up operation
348        let deadline = std::time::Instant::now() + max_wait;
349
350        while std::time::Instant::now() < deadline {
351            match timeout(
352                deadline.saturating_duration_since(std::time::Instant::now()),
353                self.rx.recv(),
354            )
355            .await
356            {
357                Ok(Some(Ok(msg))) => {
358                    debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
359                    let block_pos = block_history.determine_block_position(&msg.header)?;
360                    results.push(msg);
361                    if matches!(block_pos, BlockPosition::NextExpected) {
362                        break;
363                    }
364                }
365                Ok(Some(Err(e))) => {
366                    // Synchronizer errored during catch up
367                    self.mark_errored(e);
368                    return Ok(None);
369                }
370                Ok(None) => {
371                    // This case should not happen, as we shouldn't poll the synchronizer after we
372                    // closed it or after it errored.
373                    warn!(
374                        %extractor_id,
375                        "Tried to poll from closed synchronizer during catch up.",
376                    );
377                    self.mark_closed();
378                    return Ok(None);
379                }
380                Err(_) => {
381                    debug!(%extractor_id, "Timed out waiting for catch-up");
382                    break;
383                }
384            }
385        }
386
387        let merged = results
388            .into_iter()
389            .reduce(|l, r| l.merge(r));
390
391        if let Some(msg) = merged {
392            // we were able to get at least one block out
393            debug!(%extractor_id, "Delayed extractor made progress!");
394            self.transition(msg.header.clone(), block_history, stale_threshold)?;
395            Ok(Some(msg))
396        } else {
397            // No progress made during catch-up, check if we should go stale
398            self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
399            Ok(None)
400        }
401    }
402
403    /// Helper method to check if synchronizer should transition to stale based on time elapsed
404    fn check_and_transition_to_stale_if_needed(
405        &mut self,
406        stale_threshold: std::time::Duration,
407        fallback_header: Option<BlockHeader>,
408    ) -> Result<bool, BlockSynchronizerError> {
409        let now = Local::now().naive_utc();
410        let wait_duration = now.signed_duration_since(self.modify_ts);
411        let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
412            .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
413
414        if wait_duration > stale_threshold_chrono {
415            let header_to_use = match (&self.state, fallback_header) {
416                (SynchronizerState::Ready(h), _) |
417                (SynchronizerState::Delayed(h), _) |
418                (SynchronizerState::Stale(h), _) => h.clone(),
419                (_, Some(h)) => h,
420                _ => BlockHeader::default(),
421            };
422
423            warn!(
424                extractor_id=%self.extractor_id,
425                last_message_at=?self.modify_ts,
426                "SynchronizerStream transition to stale due to timeout."
427            );
428            self.state = SynchronizerState::Stale(header_to_use);
429            self.modify_ts = now;
430            Ok(true)
431        } else {
432            Ok(false)
433        }
434    }
435
436    /// Logic to transition a state synchronizer based on newly received block
437    ///
438    /// Updates the synchronizer's state according to the position of the received block:
439    /// - Next expected block -> Ready state
440    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
441    /// - Advanced block -> Advanced state (block ahead of expected position)
442    fn transition(
443        &mut self,
444        latest_retrieved: BlockHeader,
445        block_history: &BlockHistory,
446        stale_threshold: std::time::Duration,
447    ) -> Result<(), BlockSynchronizerError> {
448        let extractor_id = self.extractor_id.clone();
449        let last_message_at = self.modify_ts;
450        let block = &latest_retrieved;
451
452        match block_history.determine_block_position(&latest_retrieved)? {
453            BlockPosition::NextExpected => {
454                self.state = SynchronizerState::Ready(latest_retrieved.clone());
455                trace!(
456                    next = %latest_retrieved,
457                    extractor = %extractor_id,
458                    "SynchronizerStream transition to next expected"
459                )
460            }
461            BlockPosition::Latest | BlockPosition::Delayed => {
462                if !self.check_and_transition_to_stale_if_needed(
463                    stale_threshold,
464                    Some(latest_retrieved.clone()),
465                )? {
466                    warn!(
467                        %extractor_id,
468                        ?last_message_at,
469                        %block,
470                        "SynchronizerStream transition transition to delayed."
471                    );
472                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
473                }
474            }
475            BlockPosition::Advanced => {
476                info!(
477                    %extractor_id,
478                    ?last_message_at,
479                    latest = opt(&block_history.latest()),
480                    %block,
481                    "SynchronizerStream transition to advanced."
482                );
483                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
484            }
485        }
486        self.modify_ts = Local::now().naive_utc();
487        Ok(())
488    }
489
490    /// Marks this stream as errored
491    ///
492    /// Sets an error and transitions the stream to Ended, correctly recording the
493    /// time at which this happened.
494    fn mark_errored(&mut self, error: SynchronizerError) {
495        self.state = SynchronizerState::Ended(error.to_string());
496        self.modify_ts = Local::now().naive_utc();
497        self.error = Some(error);
498    }
499
500    /// Marks a stream as closed.
501    ///
502    /// If the stream has not been ended previously, e.g. by an error it will be marked
503    /// as Ended without error. This should not happen since we should stop consuming
504    /// from the stream if an error occured.
505    fn mark_closed(&mut self) {
506        if !matches!(self.state, SynchronizerState::Ended(_)) {
507            self.state = SynchronizerState::Ended("Closed".to_string());
508            self.modify_ts = Local::now().naive_utc();
509        }
510    }
511
512    /// Marks a stream as stale.
513    fn mark_stale(&mut self, header: &BlockHeader) {
514        self.state = SynchronizerState::Stale(header.clone());
515        self.modify_ts = Local::now().naive_utc();
516    }
517
518    /// Marks this stream as ready.
519    fn mark_ready(&mut self, header: &BlockHeader) {
520        self.state = SynchronizerState::Ready(header.clone());
521        self.modify_ts = Local::now().naive_utc();
522    }
523
524    fn has_ended(&self) -> bool {
525        matches!(self.state, SynchronizerState::Ended(_))
526    }
527
528    fn is_stale(&self) -> bool {
529        matches!(self.state, SynchronizerState::Stale(_))
530    }
531
532    fn is_advanced(&self) -> bool {
533        matches!(self.state, SynchronizerState::Advanced(_))
534    }
535
536    /// Gets the streams current header from active streams.
537    ///
538    /// A stream is considered as active unless it has ended or is stale.
539    fn get_current_header(&self) -> Option<&BlockHeader> {
540        match &self.state {
541            SynchronizerState::Ready(b) |
542            SynchronizerState::Delayed(b) |
543            SynchronizerState::Advanced(b) => Some(b),
544            _ => None,
545        }
546    }
547}
548
549#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
550pub struct FeedMessage<H = BlockHeader>
551where
552    H: HeaderLike,
553{
554    pub state_msgs: HashMap<String, StateSyncMessage<H>>,
555    pub sync_states: HashMap<String, SynchronizerState>,
556}
557
558impl<H> FeedMessage<H>
559where
560    H: HeaderLike,
561{
562    fn new(
563        state_msgs: HashMap<String, StateSyncMessage<H>>,
564        sync_states: HashMap<String, SynchronizerState>,
565    ) -> Self {
566        Self { state_msgs, sync_states }
567    }
568}
569
570impl<S> BlockSynchronizer<S>
571where
572    S: StateSynchronizer,
573{
574    pub fn new(
575        block_time: std::time::Duration,
576        latency_buffer: std::time::Duration,
577        max_missed_blocks: u64,
578    ) -> Self {
579        Self {
580            synchronizers: None,
581            max_messages: None,
582            block_time,
583            latency_buffer,
584            startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
585            max_missed_blocks,
586        }
587    }
588
589    /// Limits the stream to emit a maximum number of messages.
590    ///
591    /// After the stream emitted max messages it will end. This is only useful for
592    /// testing purposes or if you only want to process a fixed amount of messages
593    /// and then terminate cleanly.
594    pub fn max_messages(&mut self, val: usize) {
595        self.max_messages = Some(val);
596    }
597
598    /// Sets timeout for the first message of a protocol.
599    ///
600    /// Time to wait for the full first message, including snapshot retrieval.
601    pub fn startup_timeout(mut self, val: Duration) {
602        self.startup_timeout = val;
603    }
604
605    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
606        let mut registered = self.synchronizers.unwrap_or_default();
607        registered.insert(id, synchronizer);
608        self.synchronizers = Some(registered);
609        self
610    }
611
612    #[cfg(test)]
613    pub fn with_short_timeouts() -> Self {
614        Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
615    }
616
617    /// Cleanup function for shutting down remaining synchronizers when the nanny detects an error.
618    /// Sends close signals to all remaining synchronizers and waits for them to complete.
619    async fn cleanup_synchronizers(
620        mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
621        sync_close_senders: Vec<oneshot::Sender<()>>,
622    ) {
623        // Send close signals to all remaining synchronizers
624        for close_sender in sync_close_senders {
625            let _ = close_sender.send(());
626        }
627
628        // Await remaining tasks with timeout
629        let mut completed_tasks = 0;
630        while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
631            completed_tasks += 1;
632        }
633
634        // Warn if any synchronizers timed out during cleanup
635        let remaining_tasks = state_sync_tasks.len();
636        if remaining_tasks > 0 {
637            warn!(
638                completed = completed_tasks,
639                timed_out = remaining_tasks,
640                "Some synchronizers timed out during cleanup and may not have shut down cleanly"
641            );
642        }
643    }
644
645    /// Starts the synchronization of streams.
646    ///
647    /// Will error directly if the startup fails. Once the startup is complete, it will
648    /// communicate any fatal errors through the stream before closing it.
649    pub async fn run(
650        mut self,
651    ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
652    {
653        trace!("Starting BlockSynchronizer...");
654        let state_sync_tasks = FuturesUnordered::new();
655        let mut synchronizers = self
656            .synchronizers
657            .take()
658            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
659        // init synchronizers
660        let init_tasks = synchronizers
661            .values_mut()
662            .map(|s| s.initialize())
663            .collect::<Vec<_>>();
664        try_join_all(init_tasks).await?;
665
666        let mut sync_streams = Vec::with_capacity(synchronizers.len());
667        let mut sync_close_senders = Vec::new();
668        for (extractor_id, synchronizer) in synchronizers.drain() {
669            let (handle, rx) = synchronizer.start().await;
670            let (join_handle, close_sender) = handle.split();
671            state_sync_tasks.push(join_handle);
672            sync_close_senders.push(close_sender);
673
674            sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
675        }
676
677        // startup, schedule first set of futures and wait for them to return to initialise
678        // synchronizers.
679        debug!("Waiting for initial synchronizer messages...");
680        let mut startup_futures = Vec::new();
681        for synchronizer in sync_streams.iter_mut() {
682            let fut = async {
683                let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
684                (synchronizer, res)
685            };
686            startup_futures.push(fut);
687        }
688
689        let mut ready_sync_msgs = HashMap::new();
690        let initial_headers = join_all(startup_futures)
691            .await
692            .into_iter()
693            .filter_map(|(synchronizer, res)| {
694                let extractor_id = synchronizer.extractor_id.clone();
695                match res {
696                    Ok(Some(Ok(msg))) => {
697                        debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
698                        // initially default all synchronizers to Ready
699                        synchronizer.mark_ready(&msg.header);
700                        let header = msg.header.clone();
701                        ready_sync_msgs.insert(extractor_id.name.clone(), msg);
702                        Some(header)
703                    }
704                    Ok(Some(Err(e))) => {
705                        synchronizer.mark_errored(e);
706                        None
707                    }
708                    Ok(None) => {
709                        // Synchronizer closed channel. This can only happen if the run
710                        // task ended, before this, the synchronizer should have sent
711                        // an error, so this case we likely don't have to handle that
712                        // explicitly
713                        warn!(%extractor_id, "Synchronizer closed during startup");
714                        synchronizer.mark_closed();
715                        None
716                    }
717                    Err(_) => {
718                        // We got an error because the synchronizer timed out during startup
719                        warn!(%extractor_id, "Timed out waiting for first message");
720                        synchronizer.mark_stale(&BlockHeader::default());
721                        None
722                    }
723                }
724            })
725            .collect::<HashSet<_>>() // remove duplicates
726            .into_iter()
727            .collect::<Vec<_>>();
728
729        // Ensures we have at least one ready stream
730        Self::check_streams(&sync_streams)?;
731        let mut block_history = BlockHistory::new(initial_headers, 15)?;
732        // Determine the starting header for synchronization
733        let start_header = block_history
734            .latest()
735            // Safe, as we have checked this invariant with `check_streams`
736            .ok_or(BlockHistoryError::EmptyHistory)?;
737        info!(
738            start_block=%start_header,
739            n_healthy=ready_sync_msgs.len(),
740            n_total=sync_streams.len(),
741            "Block synchronization started successfully!"
742        );
743
744        // Determine correct state for each remaining synchronizer, based on their header vs the
745        // latest one
746        for stream in sync_streams.iter_mut() {
747            if let SynchronizerState::Ready(header) = stream.state.clone() {
748                if header.number < start_header.number {
749                    debug!(
750                        extractor_id=%stream.extractor_id,
751                        synchronizer_block=header.number,
752                        current_block=start_header.number,
753                        "Marking synchronizer as delayed during initialization"
754                    );
755                    stream.state = SynchronizerState::Delayed(header);
756                }
757            }
758        }
759
760        let (sync_tx, sync_rx) = mpsc::channel(30);
761        let main_loop_jh = tokio::spawn(async move {
762            let mut n_iter = 1;
763            loop {
764                // Send retrieved data to receivers.
765                let msg = FeedMessage::new(
766                    std::mem::take(&mut ready_sync_msgs),
767                    sync_streams
768                        .iter()
769                        .map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
770                        .collect(),
771                );
772                if sync_tx.send(Ok(msg)).await.is_err() {
773                    info!("Receiver closed, block synchronizer terminating..");
774                    return;
775                };
776
777                // Check if we have reached the max messages
778                if let Some(max_messages) = self.max_messages {
779                    if n_iter >= max_messages {
780                        info!(max_messages, "StreamEnd");
781                        return;
782                    }
783                }
784                n_iter += 1;
785
786                let res = self
787                    .handle_next_message(
788                        &mut sync_streams,
789                        &mut ready_sync_msgs,
790                        &mut block_history,
791                    )
792                    .await;
793
794                if let Err(e) = res {
795                    // Communicate error to clients, then end the loop
796                    let _ = sync_tx.send(Err(e)).await;
797                    return;
798                }
799            }
800        });
801
802        // We await the main loop and log any panics (should be impossible). If the
803        // main loop exits, all synchronizers should be ended or stale. So we kill any
804        // remaining stale ones just in case. A final error is propagated through the
805        // channel to the user.
806        let nanny_jh = tokio::spawn(async move {
807            // report any panics
808            let _ = main_loop_jh.await.map_err(|e| {
809                if e.is_panic() {
810                    error!("BlockSynchornizer main loop panicked: {e}")
811                }
812            });
813            debug!("Main loop exited. Closing synchronizers");
814            Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
815            debug!("Shutdown complete");
816        });
817        Ok((nanny_jh, sync_rx))
818    }
819
820    /// Retrieves next message from synchronizers
821    ///
822    /// The result is written into `ready_sync_messages`. Errors only if there is a
823    /// non-recoverable error or all synchronizers have ended.
824    async fn handle_next_message(
825        &self,
826        sync_streams: &mut [SynchronizerStream],
827        ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
828        block_history: &mut BlockHistory,
829    ) -> BlockSyncResult<()> {
830        let mut recv_futures = Vec::new();
831        for stream in sync_streams.iter_mut() {
832            // If stream is in ended state, do not check for any messages (it's receiver
833            // is closed), but do check stale streams.
834            if stream.has_ended() {
835                continue;
836            }
837            // Here we simply wait block_time + max_wait. This will not work for chains with
838            // unknown block times but is simple enough for now.
839            // If we would like to support unknown block times we could: Instruct all handles to
840            // await the max block time, if a header arrives within that time transition as
841            // usual, but via a select statement get notified (using e.g. Notify) if any other
842            // handle finishes before the timeout. Then await again but this time only for
843            // max_wait and then proceed as usual. So basically each try_advance task would have
844            // a select statement that allows it to exit the first timeout preemptively if any
845            // other try_advance task finished earlier.
846            recv_futures.push(async {
847                let res = stream
848                    .try_advance(
849                        block_history,
850                        self.block_time,
851                        self.latency_buffer,
852                        self.block_time
853                            .mul_f64(self.max_missed_blocks as f64),
854                    )
855                    .await?;
856                Ok::<_, BlockSynchronizerError>(
857                    res.map(|msg| (stream.extractor_id.name.clone(), msg)),
858                )
859            });
860        }
861        ready_sync_msgs.extend(
862            join_all(recv_futures)
863                .await
864                .into_iter()
865                .collect::<Result<Vec<_>, _>>()?
866                .into_iter()
867                .flatten(),
868        );
869
870        // Check if we have any active synchronizers (Ready, Delayed, or Advanced)
871        // If all synchronizers have been purged (Stale/Ended), exit the main loop
872        Self::check_streams(sync_streams)?;
873
874        // if we have any advanced header, we reinit the block history,
875        // else we simply advance the existing history
876        if sync_streams
877            .iter()
878            .any(SynchronizerStream::is_advanced)
879        {
880            *block_history = Self::reinit_block_history(sync_streams, block_history)?;
881        } else {
882            let header = sync_streams
883                .iter()
884                .filter_map(SynchronizerStream::get_current_header)
885                .max_by_key(|b| b.number)
886                // Safe, as we have checked this invariant with `check_streams`
887                .ok_or(BlockSynchronizerError::NoReadySynchronizers(
888                    "Expected to have at least one synchronizer that is not stale or ended"
889                        .to_string(),
890                ))?;
891            block_history.push(header.clone())?;
892        }
893        Ok(())
894    }
895
896    /// Reinitialise block history and reclassifies active synchronizers states.
897    ///
898    /// We call this if we detect a future detached block. This usually only happens if
899    /// a synchronizer has a restart.
900    ///
901    /// ## Note
902    /// This method assumes that at least one synchronizer is in Advanced, Ready or
903    /// Delayed state, it will return an error in case this is not the case.
904    fn reinit_block_history(
905        sync_streams: &mut [SynchronizerStream],
906        block_history: &mut BlockHistory,
907    ) -> Result<BlockHistory, BlockSynchronizerError> {
908        let previous = block_history
909            .latest()
910            // Old block history should not be empty, startup should have populated it at this point
911            .ok_or(BlockHistoryError::EmptyHistory)?;
912        let blocks = sync_streams
913            .iter()
914            .filter_map(SynchronizerStream::get_current_header)
915            .cloned()
916            .collect();
917        let new_block_history = BlockHistory::new(blocks, 10)?;
918        let latest = new_block_history
919            .latest()
920            // Block history should not be empty, we just populated it.
921            .ok_or(BlockHistoryError::EmptyHistory)?;
922        info!(
923             %previous,
924            %latest,
925            "Advanced synchronizer detected. Reinitialized block history."
926        );
927        sync_streams
928            .iter_mut()
929            .for_each(|stream| {
930                // we only get headers from advanced, ready and delayed so stale
931                // or ended streams are not considered here
932                if let Some(header) = stream.get_current_header() {
933                    if header.number < latest.number {
934                        stream.state = SynchronizerState::Delayed(header.clone());
935                    } else if header.number == latest.number {
936                        stream.state = SynchronizerState::Ready(header.clone());
937                    }
938                }
939            });
940        Ok(new_block_history)
941    }
942
943    /// Checks if we still have at least one active stream else errors.
944    ///
945    /// If there are no active streams meaning all are ended or stale, it returns a
946    /// summary error message for the state of all synchronizers.
947    fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
948        let mut latest_errored_stream: Option<&SynchronizerStream> = None;
949
950        for stream in sync_streams.iter() {
951            // If we have at least one active stream, we can return early
952            if !stream.has_ended() && !stream.is_stale() {
953                return Ok(());
954            }
955
956            // Otherwise, we track the latest errored stream for reporting
957            if latest_errored_stream.is_none() ||
958                stream.modify_ts >
959                    latest_errored_stream
960                        .as_ref()
961                        .unwrap()
962                        .modify_ts
963            {
964                latest_errored_stream = Some(stream);
965            }
966        }
967
968        let last_error_reason = if let Some(stream) = latest_errored_stream {
969            if let Some(err) = &stream.error {
970                format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
971            } else {
972                format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
973            }
974        } else {
975            return Err(BlockSynchronizerError::NoSynchronizers);
976        };
977
978        let mut reason = vec![last_error_reason];
979
980        sync_streams.iter().for_each(|stream| {
981            reason.push(format!(
982                "{} reported as {} at {}",
983                stream.extractor_id, stream.state, stream.modify_ts
984            ))
985        });
986
987        Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
988    }
989}
990
991#[cfg(test)]
992mod tests {
993    use std::sync::Arc;
994
995    use async_trait::async_trait;
996    use test_log::test;
997    use tokio::sync::{oneshot, Mutex};
998    use tycho_common::dto::Chain;
999
1000    use super::*;
1001    use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
1002
1003    #[derive(Clone, Debug)]
1004    enum MockBehavior {
1005        Normal,          // Exit successfully when receiving close signal
1006        IgnoreClose,     // Ignore close signals and hang (for timeout testing)
1007        ExitImmediately, // Exit immediately after first message (for quick failure testing)
1008    }
1009
1010    type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1011
1012    #[derive(Clone)]
1013    struct MockStateSync {
1014        header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1015        header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1016        close_received: Arc<Mutex<bool>>,
1017        behavior: MockBehavior,
1018        // For testing: store the close sender so tests can trigger close signals
1019        close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1020    }
1021
1022    impl MockStateSync {
1023        fn new() -> Self {
1024            Self::with_behavior(MockBehavior::Normal)
1025        }
1026
1027        fn with_behavior(behavior: MockBehavior) -> Self {
1028            let (tx, rx) = mpsc::channel(1);
1029            Self {
1030                header_tx: tx,
1031                header_rx: Arc::new(Mutex::new(Some(rx))),
1032                close_received: Arc::new(Mutex::new(false)),
1033                behavior,
1034                close_tx: Arc::new(Mutex::new(None)),
1035            }
1036        }
1037
1038        async fn was_close_received(&self) -> bool {
1039            *self.close_received.lock().await
1040        }
1041
1042        async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1043            self.header_tx
1044                .send(Ok(header))
1045                .await
1046                .map_err(|e| format!("sending header failed: {e}"))
1047        }
1048
1049        // For testing: trigger a close signal to make the synchronizer exit
1050        async fn trigger_close(&self) {
1051            if let Some(close_tx) = self.close_tx.lock().await.take() {
1052                let _ = close_tx.send(());
1053            }
1054        }
1055    }
1056
1057    #[async_trait]
1058    impl StateSynchronizer for MockStateSync {
1059        async fn initialize(&mut self) -> SyncResult<()> {
1060            Ok(())
1061        }
1062
1063        async fn start(
1064            mut self,
1065        ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1066            let block_rx = {
1067                let mut guard = self.header_rx.lock().await;
1068                guard
1069                    .take()
1070                    .expect("Block receiver was not set!")
1071            };
1072
1073            // Create close channel - we need to store one sender for testing and give one to the
1074            // handle
1075            let (close_tx_for_handle, close_rx) = oneshot::channel();
1076            let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1077
1078            // Store the test close sender
1079            {
1080                let mut guard = self.close_tx.lock().await;
1081                *guard = Some(close_tx_for_test);
1082            }
1083
1084            let behavior = self.behavior.clone();
1085            let close_received_clone = self.close_received.clone();
1086            let tx = self.header_tx.clone();
1087
1088            let jh = tokio::spawn(async move {
1089                match behavior {
1090                    MockBehavior::IgnoreClose => {
1091                        // Infinite loop to simulate a hung synchronizer that doesn't respond to
1092                        // close signals
1093                        loop {
1094                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1095                        }
1096                    }
1097                    MockBehavior::ExitImmediately => {
1098                        // Exit immediately with error to simulate immediate task failure
1099                        tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1100                            "Simulated immediate task failure".to_string(),
1101                        )))
1102                        .await
1103                        .unwrap();
1104                    }
1105                    MockBehavior::Normal => {
1106                        // Wait for close signal from either handle or test, then respond based on
1107                        // behavior
1108                        let _ = tokio::select! {
1109                            result = close_rx => result,
1110                            result = close_rx_for_test => result,
1111                        };
1112                        let mut guard = close_received_clone.lock().await;
1113                        *guard = true;
1114                    }
1115                }
1116            });
1117
1118            let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1119            (handle, block_rx)
1120        }
1121    }
1122
1123    fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1124        StateSyncMessage {
1125            header: BlockHeader {
1126                number: block as u64,
1127                hash: Bytes::from(vec![block]),
1128                parent_hash: Bytes::from(vec![block - 1]),
1129                revert: false,
1130                timestamp: 1000,
1131            },
1132            ..Default::default()
1133        }
1134    }
1135
1136    async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1137        timeout(Duration::from_millis(100), rx.recv())
1138            .await
1139            .expect("Responds in time")
1140            .expect("Should receive first message")
1141            .expect("No error")
1142    }
1143
1144    async fn setup_block_sync(
1145    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1146    {
1147        setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1148    }
1149
1150    // Starts up a synchronizer and consumes the first message on block 1.
1151    async fn setup_block_sync_with_behaviour(
1152        v2_behavior: MockBehavior,
1153        v3_behavior: MockBehavior,
1154    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1155    {
1156        let v2_sync = MockStateSync::with_behavior(v2_behavior);
1157        let v3_sync = MockStateSync::with_behavior(v3_behavior);
1158
1159        // Use reasonable timeouts to observe proper state transitions
1160        let mut block_sync = BlockSynchronizer::new(
1161            Duration::from_millis(20), // block_time
1162            Duration::from_millis(10), // max_wait
1163            3,                         // max_missed_blocks (stale threshold = 20ms * 3 = 60ms)
1164        );
1165        block_sync.max_messages(10); // Allow enough messages to see the progression
1166
1167        let block_sync = block_sync
1168            .register_synchronizer(
1169                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1170                v2_sync.clone(),
1171            )
1172            .register_synchronizer(
1173                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1174                v3_sync.clone(),
1175            );
1176
1177        // Send initial messages to both synchronizers
1178        let block1_msg = header_message(1);
1179        let _ = v2_sync
1180            .send_header(block1_msg.clone())
1181            .await;
1182        let _ = v3_sync
1183            .send_header(block1_msg.clone())
1184            .await;
1185
1186        // Start the block synchronizer
1187        let (nanny_handle, mut rx) = block_sync
1188            .run()
1189            .await
1190            .expect("BlockSynchronizer failed to start");
1191
1192        let first_feed_msg = receive_message(&mut rx).await;
1193        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1194        assert!(matches!(
1195            first_feed_msg
1196                .sync_states
1197                .get("uniswap-v2")
1198                .unwrap(),
1199            SynchronizerState::Ready(_)
1200        ));
1201        assert!(matches!(
1202            first_feed_msg
1203                .sync_states
1204                .get("uniswap-v3")
1205                .unwrap(),
1206            SynchronizerState::Ready(_)
1207        ));
1208
1209        (v2_sync, v3_sync, nanny_handle, rx)
1210    }
1211
1212    async fn shutdown_block_synchronizer(
1213        v2_sync: &MockStateSync,
1214        v3_sync: &MockStateSync,
1215        nanny_handle: JoinHandle<()>,
1216    ) {
1217        v3_sync.trigger_close().await;
1218        v2_sync.trigger_close().await;
1219
1220        timeout(Duration::from_millis(100), nanny_handle)
1221            .await
1222            .expect("Nanny failed to exit within time")
1223            .expect("Nanny panicked");
1224    }
1225
1226    #[test(tokio::test)]
1227    async fn test_two_ready_synchronizers() {
1228        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1229
1230        let second_msg = header_message(2);
1231        v2_sync
1232            .send_header(second_msg.clone())
1233            .await
1234            .expect("send_header failed");
1235        v3_sync
1236            .send_header(second_msg.clone())
1237            .await
1238            .expect("send_header failed");
1239        let second_feed_msg = receive_message(&mut rx).await;
1240
1241        let exp2 = FeedMessage {
1242            state_msgs: [
1243                ("uniswap-v2".to_string(), second_msg.clone()),
1244                ("uniswap-v3".to_string(), second_msg.clone()),
1245            ]
1246            .into_iter()
1247            .collect(),
1248            sync_states: [
1249                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1250                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1251            ]
1252            .into_iter()
1253            .collect(),
1254        };
1255        assert_eq!(second_feed_msg, exp2);
1256
1257        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1258    }
1259
1260    #[test(tokio::test)]
1261    async fn test_delayed_synchronizer_catches_up() {
1262        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1263
1264        // Send block 2 to v2 synchronizer only
1265        let block2_msg = header_message(2);
1266        v2_sync
1267            .send_header(block2_msg.clone())
1268            .await
1269            .expect("send_header failed");
1270
1271        // Consume second message - v3 should be delayed
1272        let second_feed_msg = receive_message(&mut rx).await;
1273        debug!("Consumed second message for v2");
1274
1275        assert!(second_feed_msg
1276            .state_msgs
1277            .contains_key("uniswap-v2"));
1278        assert!(matches!(
1279            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1280            SynchronizerState::Ready(header) if header.number == 2
1281        ));
1282        assert!(!second_feed_msg
1283            .state_msgs
1284            .contains_key("uniswap-v3"));
1285        assert!(matches!(
1286            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1287            SynchronizerState::Delayed(header) if header.number == 1
1288        ));
1289
1290        // Now v3 catches up to block 2
1291        v3_sync
1292            .send_header(block2_msg.clone())
1293            .await
1294            .expect("send_header failed");
1295
1296        // Both advance to block 3
1297        let block3_msg = header_message(3);
1298        v2_sync
1299            .send_header(block3_msg.clone())
1300            .await
1301            .expect("send_header failed");
1302        v3_sync
1303            .send_header(block3_msg)
1304            .await
1305            .expect("send_header failed");
1306
1307        // Consume messages until we get both synchronizers on block 3
1308        // We may get an intermediate message for v3's catch-up or a combined message
1309        let mut third_feed_msg = receive_message(&mut rx).await;
1310
1311        // If this message doesn't have both univ2, it's an intermediate message, so we get the next
1312        // one
1313        if !third_feed_msg
1314            .state_msgs
1315            .contains_key("uniswap-v2")
1316        {
1317            third_feed_msg = rx
1318                .recv()
1319                .await
1320                .expect("header channel was closed")
1321                .expect("no error");
1322        }
1323        assert!(third_feed_msg
1324            .state_msgs
1325            .contains_key("uniswap-v2"));
1326        assert!(third_feed_msg
1327            .state_msgs
1328            .contains_key("uniswap-v3"));
1329        assert!(matches!(
1330            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1331            SynchronizerState::Ready(header) if header.number == 3
1332        ));
1333        assert!(matches!(
1334            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1335            SynchronizerState::Ready(header) if header.number == 3
1336        ));
1337
1338        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1339    }
1340
1341    #[test(tokio::test)]
1342    async fn test_different_start_blocks() {
1343        let v2_sync = MockStateSync::new();
1344        let v3_sync = MockStateSync::new();
1345        let block_sync = BlockSynchronizer::with_short_timeouts()
1346            .register_synchronizer(
1347                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1348                v2_sync.clone(),
1349            )
1350            .register_synchronizer(
1351                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1352                v3_sync.clone(),
1353            );
1354
1355        // Initial messages - synchronizers at different blocks
1356        let block1_msg = header_message(1);
1357        let block2_msg = header_message(2);
1358
1359        let _ = v2_sync
1360            .send_header(block1_msg.clone())
1361            .await;
1362        v3_sync
1363            .send_header(block2_msg.clone())
1364            .await
1365            .expect("send_header failed");
1366
1367        // Start the block synchronizer - it should use block 2 as the starting block
1368        let (jh, mut rx) = block_sync
1369            .run()
1370            .await
1371            .expect("BlockSynchronizer failed to start.");
1372
1373        // Consume first message
1374        let first_feed_msg = receive_message(&mut rx).await;
1375        assert!(matches!(
1376            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1377            SynchronizerState::Delayed(header) if header.number == 1
1378        ));
1379        assert!(matches!(
1380            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1381            SynchronizerState::Ready(header) if header.number == 2
1382        ));
1383
1384        // Now v2 catches up to block 2
1385        v2_sync
1386            .send_header(block2_msg.clone())
1387            .await
1388            .expect("send_header failed");
1389
1390        // Both advance to block 3
1391        let block3_msg = header_message(3);
1392        let _ = v2_sync
1393            .send_header(block3_msg.clone())
1394            .await;
1395        v3_sync
1396            .send_header(block3_msg.clone())
1397            .await
1398            .expect("send_header failed");
1399
1400        // Consume third message - both should be on block 3
1401        let second_feed_msg = receive_message(&mut rx).await;
1402        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1403        assert!(matches!(
1404            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1405            SynchronizerState::Ready(header) if header.number == 3
1406        ));
1407        assert!(matches!(
1408            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1409            SynchronizerState::Ready(header) if header.number == 3
1410        ));
1411
1412        shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
1413    }
1414
1415    #[test(tokio::test)]
1416    async fn test_synchronizer_fails_other_goes_stale() {
1417        let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1418            setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1419                .await;
1420
1421        let mut error_reported = false;
1422        for _ in 0..3 {
1423            if let Some(msg) = sync_rx.recv().await {
1424                match msg {
1425                    Err(_) => error_reported = true,
1426                    Ok(msg) => {
1427                        assert!(matches!(
1428                            msg.sync_states
1429                                .get("uniswap-v3")
1430                                .unwrap(),
1431                            SynchronizerState::Delayed(_)
1432                        ));
1433                        assert!(matches!(
1434                            msg.sync_states
1435                                .get("uniswap-v2")
1436                                .unwrap(),
1437                            SynchronizerState::Ended(_)
1438                        ));
1439                    }
1440                }
1441            }
1442        }
1443        assert!(error_reported, "BlockSynchronizer did not report final error");
1444
1445        // Wait for nanny to detect task failure and execute cleanup
1446        let result = timeout(Duration::from_secs(2), nanny_handle).await;
1447        assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1448
1449        // Verify that the remaining synchronizer received close signal during cleanup
1450        assert!(
1451            v3_sync.was_close_received().await,
1452            "v3_sync should have received close signal during cleanup"
1453        );
1454    }
1455
1456    #[test(tokio::test)]
1457    async fn test_cleanup_timeout_warning() {
1458        // Verify that cleanup_synchronizers emits a warning when synchronizers timeout during
1459        // cleanup
1460        let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1461            MockBehavior::ExitImmediately,
1462            MockBehavior::IgnoreClose,
1463        )
1464        .await;
1465
1466        // Wait for nanny to complete - cleanup should timeout on v3_sync but still complete
1467        let result = timeout(Duration::from_secs(10), nanny_handle).await;
1468        assert!(
1469            result.is_ok(),
1470            "Nanny should complete even when some synchronizers timeout during cleanup"
1471        );
1472
1473        // Note: In a real test environment, we would capture log output to verify the warning was
1474        // emitted. Since this is a unit test without log capture setup, we just verify that
1475        // cleanup completes even when some synchronizers timeout.
1476    }
1477
1478    #[test(tokio::test)]
1479    async fn test_one_synchronizer_goes_stale_while_other_works() {
1480        // Test Case 1: One protocol goes stale and is removed while another protocol works normally
1481        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1482
1483        // Send block 2 only to v3, v2 will timeout and become delayed
1484        let block2_msg = header_message(2);
1485        let _ = v3_sync
1486            .send_header(block2_msg.clone())
1487            .await;
1488        // Don't send to v2_sync - it will timeout
1489
1490        // Consume second message - v2 should be delayed, v3 ready
1491        let second_feed_msg = receive_message(&mut rx).await;
1492        assert!(second_feed_msg
1493            .state_msgs
1494            .contains_key("uniswap-v3"));
1495        assert!(!second_feed_msg
1496            .state_msgs
1497            .contains_key("uniswap-v2"));
1498        assert!(matches!(
1499            second_feed_msg
1500                .sync_states
1501                .get("uniswap-v3")
1502                .unwrap(),
1503            SynchronizerState::Ready(_)
1504        ));
1505        // v2 should be delayed (if still present) - check nanny is still running
1506        if let Some(v2_state) = second_feed_msg
1507            .sync_states
1508            .get("uniswap-v2")
1509        {
1510            if matches!(v2_state, SynchronizerState::Delayed(_)) {
1511                // Verify nanny is still running when synchronizer is just delayed
1512                assert!(
1513                    !nanny_handle.is_finished(),
1514                    "Nanny should still be running when synchronizer is delayed (not stale yet)"
1515                );
1516            }
1517        }
1518
1519        // Wait a bit, then continue sending blocks to v3 but not v2
1520        tokio::time::sleep(Duration::from_millis(15)).await;
1521
1522        // Continue sending blocks only to v3 to keep it healthy while v2 goes stale
1523        let block3_msg = header_message(3);
1524        let _ = v3_sync
1525            .send_header(block3_msg.clone())
1526            .await;
1527
1528        tokio::time::sleep(Duration::from_millis(40)).await;
1529
1530        let mut stale_found = false;
1531        for _ in 0..2 {
1532            if let Some(Ok(msg)) = rx.recv().await {
1533                if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1534                    stale_found = true;
1535                }
1536            }
1537        }
1538        assert!(stale_found, "v2 synchronizer should be stale");
1539
1540        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1541    }
1542
1543    #[test(tokio::test)]
1544    async fn test_all_synchronizers_go_stale_main_loop_exits() {
1545        // Test Case 2: All protocols go stale and main loop exits gracefully
1546        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1547
1548        // Stop sending messages to both synchronizers - they should both timeout and go stale
1549        // Don't send any more messages, let them timeout and become delayed, then stale
1550
1551        // Monitor the state transitions to ensure proper delayed -> stale progression
1552        let mut seen_delayed = false;
1553
1554        // Consume messages and track state transitions
1555        // Give enough time for the synchronizers to transition through states
1556        let timeout_duration = Duration::from_millis(500); // Generous timeout
1557        let start_time = tokio::time::Instant::now();
1558
1559        while let Ok(Some(Ok(msg))) =
1560            tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1561        {
1562            // Track when synchronizers transition to delayed
1563            if !seen_delayed {
1564                let v2_state = msg.sync_states.get("uniswap-v2");
1565                let v3_state = msg.sync_states.get("uniswap-v3");
1566
1567                if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1568                    matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1569                {
1570                    seen_delayed = true;
1571                    // Verify nanny is still running when synchronizers are just delayed
1572                    assert!(!nanny_handle.is_finished(),
1573                        "Nanny should still be running when synchronizers are delayed (not stale yet)");
1574                    // Once we've seen delayed and verified nanny is running, we can break
1575                    break;
1576                }
1577            }
1578
1579            // Safety timeout to avoid infinite loop
1580            if start_time.elapsed() > timeout_duration {
1581                break;
1582            }
1583        }
1584        // Verify that synchronizers went through proper state transitions
1585        assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1586
1587        let mut error_reported = false;
1588        // Consume any remaining messages until channel closes
1589        while let Some(msg) = rx.recv().await {
1590            if let Err(e) = msg {
1591                assert!(e
1592                    .to_string()
1593                    .contains("became: Stale(1)"));
1594                assert!(e
1595                    .to_string()
1596                    .contains("reported as Stale(1)"));
1597                error_reported = true;
1598            }
1599        }
1600        assert!(error_reported, "Expected the channel to report an error before closing");
1601
1602        // The nanny should complete when the main loop exits due to no ready synchronizers
1603        let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1604        assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1605
1606        // Verify cleanup was triggered for both synchronizers
1607        assert!(
1608            v2_sync.was_close_received().await,
1609            "v2_sync should have received close signal during cleanup"
1610        );
1611        assert!(
1612            v3_sync.was_close_received().await,
1613            "v3_sync should have received close signal during cleanup"
1614        );
1615    }
1616
1617    #[test(tokio::test)]
1618    async fn test_stale_synchronizer_recovers() {
1619        // Test Case 2: All protocols go stale and main loop exits gracefully
1620        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1621
1622        // Send second messages to v2 only, shortly before both would go stale
1623        tokio::time::sleep(Duration::from_millis(50)).await;
1624        let block2_msg = header_message(2);
1625        let _ = v2_sync
1626            .send_header(block2_msg.clone())
1627            .await;
1628
1629        // we should get two messages here
1630        for _ in 0..2 {
1631            if let Some(msg) = rx.recv().await {
1632                if let Ok(msg) = msg {
1633                    if matches!(
1634                        msg.sync_states
1635                            .get("uniswap-v2")
1636                            .unwrap(),
1637                        SynchronizerState::Ready(_)
1638                    ) {
1639                        assert!(matches!(
1640                            msg.sync_states
1641                                .get("uniswap-v3")
1642                                .unwrap(),
1643                            SynchronizerState::Delayed(_)
1644                        ));
1645                        break;
1646                    };
1647                }
1648            } else {
1649                panic!("Channel closed unexpectedly")
1650            }
1651        }
1652
1653        // Now v3 should be stale
1654        tokio::time::sleep(Duration::from_millis(15)).await;
1655        let block3_msg = header_message(3);
1656        let _ = v2_sync
1657            .send_header(block3_msg.clone())
1658            .await;
1659        let third_msg = receive_message(&mut rx).await;
1660        dbg!(&third_msg);
1661        assert!(matches!(
1662            third_msg
1663                .sync_states
1664                .get("uniswap-v2")
1665                .unwrap(),
1666            SynchronizerState::Ready(_)
1667        ));
1668        assert!(matches!(
1669            third_msg
1670                .sync_states
1671                .get("uniswap-v3")
1672                .unwrap(),
1673            SynchronizerState::Stale(_)
1674        ));
1675
1676        let block4_msg = header_message(4);
1677        let _ = v3_sync
1678            .send_header(block2_msg.clone())
1679            .await;
1680        let _ = v3_sync
1681            .send_header(block3_msg.clone())
1682            .await;
1683        let _ = v3_sync
1684            .send_header(block4_msg.clone())
1685            .await;
1686        let _ = v2_sync
1687            .send_header(block4_msg.clone())
1688            .await;
1689        let fourth_msg = receive_message(&mut rx).await;
1690        assert!(matches!(
1691            fourth_msg
1692                .sync_states
1693                .get("uniswap-v2")
1694                .unwrap(),
1695            SynchronizerState::Ready(_)
1696        ));
1697        assert!(matches!(
1698            fourth_msg
1699                .sync_states
1700                .get("uniswap-v3")
1701                .unwrap(),
1702            SynchronizerState::Ready(_)
1703        ));
1704
1705        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1706
1707        // Verify cleanup was triggered for both synchronizers
1708        assert!(
1709            v2_sync.was_close_received().await,
1710            "v2_sync should have received close signal during cleanup"
1711        );
1712        assert!(
1713            v3_sync.was_close_received().await,
1714            "v3_sync should have received close signal during cleanup"
1715        );
1716    }
1717
1718    #[test(tokio::test)]
1719    async fn test_all_synchronizer_advanced() {
1720        // Test the case were all synchronizers successfully recover but stream
1721        // from a disconnected future block.
1722
1723        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1724
1725        let block3 = header_message(3);
1726        v2_sync
1727            .send_header(block3.clone())
1728            .await
1729            .unwrap();
1730        v3_sync
1731            .send_header(block3)
1732            .await
1733            .unwrap();
1734
1735        let msg = receive_message(&mut rx).await;
1736        matches!(
1737            msg.sync_states
1738                .get("uniswap-v2")
1739                .unwrap(),
1740            SynchronizerState::Ready(_)
1741        );
1742        matches!(
1743            msg.sync_states
1744                .get("uniswap-v3")
1745                .unwrap(),
1746            SynchronizerState::Ready(_)
1747        );
1748
1749        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1750    }
1751
1752    #[test(tokio::test)]
1753    async fn test_one_synchronizer_advanced() {
1754        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1755
1756        let block2 = header_message(2);
1757        let block4 = header_message(4);
1758        v2_sync
1759            .send_header(block4.clone())
1760            .await
1761            .unwrap();
1762        v3_sync
1763            .send_header(block2.clone())
1764            .await
1765            .unwrap();
1766
1767        let msg = receive_message(&mut rx).await;
1768        matches!(
1769            msg.sync_states
1770                .get("uniswap-v2")
1771                .unwrap(),
1772            SynchronizerState::Ready(_)
1773        );
1774        matches!(
1775            msg.sync_states
1776                .get("uniswap-v3")
1777                .unwrap(),
1778            SynchronizerState::Delayed(_)
1779        );
1780
1781        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1782    }
1783}