tycho_client/feed/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::{Display, Formatter},
4    time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{
9    future::{join_all, try_join_all},
10    stream::FuturesUnordered,
11    StreamExt,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::{
16    sync::{
17        mpsc::{self, Receiver},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25    display::opt,
26    dto::{Block, ExtractorIdentity},
27    Bytes,
28};
29
30use crate::feed::{
31    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
32    synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
33};
34
35mod block_history;
36pub mod component_tracker;
37pub mod synchronizer;
38
39/// A trait representing a minimal interface for types that behave like a block header.
40///
41/// This abstraction allows working with either full block headers (`BlockHeader`)
42/// or simplified structures that only provide a timestamp (e.g., for RFQ logic).
43pub trait HeaderLike {
44    fn block(self) -> Option<BlockHeader>;
45    fn block_number_or_timestamp(self) -> u64;
46}
47
48#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
49pub struct BlockHeader {
50    pub hash: Bytes,
51    pub number: u64,
52    pub parent_hash: Bytes,
53    pub revert: bool,
54    pub timestamp: u64,
55}
56
57impl Display for BlockHeader {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        // Take first 6 hex chars of the hash for readability
60        let short_hash = if self.hash.len() >= 4 {
61            hex::encode(&self.hash[..4]) // 4 bytes → 8 hex chars
62        } else {
63            hex::encode(&self.hash)
64        };
65
66        write!(f, "Block #{} [0x{}..]", self.number, short_hash)
67    }
68}
69
70impl BlockHeader {
71    fn from_block(block: &Block, revert: bool) -> Self {
72        Self {
73            hash: block.hash.clone(),
74            number: block.number,
75            parent_hash: block.parent_hash.clone(),
76            revert,
77            timestamp: block.ts.and_utc().timestamp() as u64,
78        }
79    }
80}
81
82impl HeaderLike for BlockHeader {
83    fn block(self) -> Option<BlockHeader> {
84        Some(self)
85    }
86
87    fn block_number_or_timestamp(self) -> u64 {
88        self.number
89    }
90}
91
92#[derive(Error, Debug)]
93pub enum BlockSynchronizerError {
94    #[error("Failed to initialize synchronizer: {0}")]
95    InitializationError(#[from] SynchronizerError),
96
97    #[error("Failed to process new block: {0}")]
98    BlockHistoryError(#[from] BlockHistoryError),
99
100    #[error("Not a single synchronizer was ready: {0}")]
101    NoReadySynchronizers(String),
102
103    #[error("No synchronizers were set")]
104    NoSynchronizers,
105
106    #[error("Failed to convert duration: {0}")]
107    DurationConversionError(String),
108}
109
110type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
111
112/// Aligns multiple StateSynchronizers on the block dimension.
113///
114/// ## Purpose
115/// The purpose of this component is to handle streams from multiple state synchronizers and
116/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
117/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
118/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
119/// included again once we reach the block it is at.
120///
121/// ## Limitations
122/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
123///
124/// ## Initialisation
125/// Queries all registered synchronizers for their first message and evaluates the state of each
126/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
127// TODO: what is the startup timeout
128/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
129/// closed.
130///
131/// ## Main loop
132/// Once started, the synchronizers are queried concurrently for messages in lock step:
133/// the main loop queries all synchronizers in ready for the last emitted data, builds the
134/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
135///
136/// ## Synchronization Logic
137///
138/// To classify a synchronizer as delayed, we need to first define the current block. The highest
139/// block number of all ready synchronizers is considered the current block.
140///
141/// Once we have the current block we can easily determine which block we expect next. And if a
142/// synchronizer delivers an older block we can classify it as delayed.
143///
144/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
145/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
146/// the height of an advanced synchronizer (and flagging it as such in the meantime).
147///
148/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
149/// within the block production step of the blockchain:
150/// The wait procedure consists of waiting for any of the individual ProtocolStateSynchronizers
151/// to emit a new message (within a max timeout - several multiples of the block time). Once a
152/// message is received a very short timeout starts for the remaining synchronizers, to deliver a
153/// message. Any synchronizer failing to do so is transitioned to delayed.
154///
155/// ### Note
156/// The described process above is the goal. It is currently not implemented like that. Instead we
157/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
158/// timeout. This is simpler but only works well on chains with fixed block times.
159pub struct BlockSynchronizer<S> {
160    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
161    /// Time to wait for a block usually
162    block_time: std::time::Duration,
163    /// Added on top of block time to account for latency
164    latency_buffer: std::time::Duration,
165    /// Time to wait for the full first message, including snapshot retrieval
166    startup_timeout: std::time::Duration,
167    /// Optionally, end the stream after emitting max messages
168    max_messages: Option<usize>,
169    /// Amount of blocks a protocol can be delayed for, before it is considered stale
170    max_missed_blocks: u64,
171}
172
173#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
174#[serde(tag = "status", rename_all = "lowercase")]
175pub enum SynchronizerState {
176    /// Initial state, assigned before trying to receive any message
177    Started,
178    /// The synchronizer emitted a message for the block as expected
179    Ready(BlockHeader),
180    /// The synchronizer is on a previous block compared to others and is expected to
181    /// catch up soon.
182    Delayed(BlockHeader),
183    /// The synchronizer hasn't emitted messages for > `max_missed_blocks` or has
184    /// fallen far behind. At this point we do not wait for it anymore but it can
185    /// still eventually recover.
186    Stale(BlockHeader),
187    /// The synchronizer is on future not connected block.
188    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
189    // StateSynchronizer. This can only happen if we are processing too slow and one or all of the
190    // synchronizers restarts e.g. due to websocket connection drops.
191    Advanced(BlockHeader),
192    /// The synchronizer ended with an error.
193    Ended(String),
194}
195
196impl Display for SynchronizerState {
197    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
198        match self {
199            SynchronizerState::Started => write!(f, "Started"),
200            SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
201            SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
202            SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
203            SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
204            SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
205        }
206    }
207}
208
209pub struct SynchronizerStream {
210    extractor_id: ExtractorIdentity,
211    state: SynchronizerState,
212    error: Option<SynchronizerError>,
213    modify_ts: NaiveDateTime,
214    rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
215}
216
217impl SynchronizerStream {
218    fn new(
219        extractor_id: &ExtractorIdentity,
220        rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
221    ) -> Self {
222        Self {
223            extractor_id: extractor_id.clone(),
224            state: SynchronizerState::Started,
225            error: None,
226            modify_ts: Local::now().naive_utc(),
227            rx,
228        }
229    }
230    async fn try_advance(
231        &mut self,
232        block_history: &BlockHistory,
233        block_time: std::time::Duration,
234        latency_buffer: std::time::Duration,
235        stale_threshold: std::time::Duration,
236    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
237        let extractor_id = self.extractor_id.clone();
238        let latest_block = block_history.latest();
239
240        match &self.state {
241            SynchronizerState::Started | SynchronizerState::Ended(_) => {
242                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
243                Ok(None)
244            }
245            SynchronizerState::Advanced(b) => {
246                let future_block = b.clone();
247                // Transition to ready once we arrived at the expected height
248                self.transition(future_block, block_history, stale_threshold)?;
249                Ok(None)
250            }
251            SynchronizerState::Ready(previous_block) => {
252                // Try to recv the next expected block, update state accordingly.
253                self.try_recv_next_expected(
254                    block_time + latency_buffer,
255                    block_history,
256                    previous_block.clone(),
257                    stale_threshold,
258                )
259                .await
260            }
261            SynchronizerState::Delayed(old_block) => {
262                // try to catch up all currently queued blocks until the expected block
263                debug!(
264                    %old_block,
265                    latest_block=opt(&latest_block),
266                    %extractor_id,
267                    "Trying to catch up to latest block"
268                );
269                self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
270                    .await
271            }
272            SynchronizerState::Stale(old_block) => {
273                // try to catch up all currently queued blocks until the expected block
274                debug!(
275                    %old_block,
276                    latest_block=opt(&latest_block),
277                    %extractor_id,
278                    "Trying to catch up stale synchronizer to latest block"
279                );
280                self.try_catch_up(block_history, block_time, stale_threshold)
281                    .await
282            }
283        }
284    }
285
286    /// Standard way to advance a well-behaved state synchronizer.
287    ///
288    /// Will wait for a new block on the synchronizer within a timeout. And modify its
289    /// state based on the outcome.
290    ///
291    /// ## Note
292    /// This method assumes that the current state is `Ready`.
293    async fn try_recv_next_expected(
294        &mut self,
295        max_wait: std::time::Duration,
296        block_history: &BlockHistory,
297        previous_block: BlockHeader,
298        stale_threshold: std::time::Duration,
299    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
300        let extractor_id = self.extractor_id.clone();
301        match timeout(max_wait, self.rx.recv()).await {
302            Ok(Some(Ok(msg))) => {
303                self.transition(msg.header.clone(), block_history, stale_threshold)?;
304                Ok(Some(msg))
305            }
306            Ok(Some(Err(e))) => {
307                // The underlying synchronizer exhausted its retries
308                self.mark_errored(e);
309                Ok(None)
310            }
311            Ok(None) => {
312                // This case should not happen, as we shouldn't poll the synchronizer after we
313                // closed it or after it errored.
314                warn!(
315                    %extractor_id,
316                    "Tried to poll from closed synchronizer.",
317                );
318                self.mark_closed();
319                Ok(None)
320            }
321            Err(_) => {
322                // trying to advance a block timed out
323                debug!(%extractor_id, %previous_block, "No block received within time limit.");
324                // No need to consider state since we only call this method if we are
325                // in the Ready state.
326                self.state = SynchronizerState::Delayed(previous_block.clone());
327                self.modify_ts = Local::now().naive_utc();
328                Ok(None)
329            }
330        }
331    }
332
333    /// Tries to catch up a delayed state synchronizer.
334    ///
335    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
336    /// by consuming all waiting messages in its queue and waiting for any new block messages
337    /// within a timeout. Finally, all update messages are merged into one and returned.
338    async fn try_catch_up(
339        &mut self,
340        block_history: &BlockHistory,
341        max_wait: std::time::Duration,
342        stale_threshold: std::time::Duration,
343    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
344        let mut results = Vec::new();
345        let extractor_id = self.extractor_id.clone();
346
347        // Set a deadline for the overall catch-up operation
348        let deadline = std::time::Instant::now() + max_wait;
349
350        while std::time::Instant::now() < deadline {
351            match timeout(
352                deadline.saturating_duration_since(std::time::Instant::now()),
353                self.rx.recv(),
354            )
355            .await
356            {
357                Ok(Some(Ok(msg))) => {
358                    debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
359                    let block_pos = block_history.determine_block_position(&msg.header)?;
360                    results.push(msg);
361                    if matches!(block_pos, BlockPosition::NextExpected) {
362                        break;
363                    }
364                }
365                Ok(Some(Err(e))) => {
366                    // Synchronizer errored during catch up
367                    self.mark_errored(e);
368                    return Ok(None);
369                }
370                Ok(None) => {
371                    // This case should not happen, as we shouldn't poll the synchronizer after we
372                    // closed it or after it errored.
373                    warn!(
374                        %extractor_id,
375                        "Tried to poll from closed synchronizer during catch up.",
376                    );
377                    self.mark_closed();
378                    return Ok(None)
379                }
380                Err(_) => {
381                    debug!(%extractor_id, "Timed out waiting for catch-up");
382                    break;
383                }
384            }
385        }
386
387        let merged = results
388            .into_iter()
389            .reduce(|l, r| l.merge(r));
390
391        if let Some(msg) = merged {
392            // we were able to get at least one block out
393            debug!(%extractor_id, "Delayed extractor made progress!");
394            self.transition(msg.header.clone(), block_history, stale_threshold)?;
395            Ok(Some(msg))
396        } else {
397            // No progress made during catch-up, check if we should go stale
398            self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
399            Ok(None)
400        }
401    }
402
403    /// Helper method to check if synchronizer should transition to stale based on time elapsed
404    fn check_and_transition_to_stale_if_needed(
405        &mut self,
406        stale_threshold: std::time::Duration,
407        fallback_header: Option<BlockHeader>,
408    ) -> Result<bool, BlockSynchronizerError> {
409        let now = Local::now().naive_utc();
410        let wait_duration = now.signed_duration_since(self.modify_ts);
411        let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
412            .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
413
414        if wait_duration > stale_threshold_chrono {
415            let header_to_use = match (&self.state, fallback_header) {
416                (SynchronizerState::Ready(h), _) |
417                (SynchronizerState::Delayed(h), _) |
418                (SynchronizerState::Stale(h), _) => h.clone(),
419                (_, Some(h)) => h,
420                _ => BlockHeader::default(),
421            };
422
423            warn!(
424                extractor_id=%self.extractor_id,
425                last_message_at=?self.modify_ts,
426                "SynchronizerStream transition to stale due to timeout."
427            );
428            self.state = SynchronizerState::Stale(header_to_use);
429            self.modify_ts = now;
430            Ok(true)
431        } else {
432            Ok(false)
433        }
434    }
435
436    /// Logic to transition a state synchronizer based on newly received block
437    ///
438    /// Updates the synchronizer's state according to the position of the received block:
439    /// - Next expected block -> Ready state
440    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
441    /// - Advanced block -> Advanced state (block ahead of expected position)
442    fn transition(
443        &mut self,
444        latest_retrieved: BlockHeader,
445        block_history: &BlockHistory,
446        stale_threshold: std::time::Duration,
447    ) -> Result<(), BlockSynchronizerError> {
448        let extractor_id = self.extractor_id.clone();
449        let last_message_at = self.modify_ts;
450        let block = &latest_retrieved;
451
452        match block_history.determine_block_position(&latest_retrieved)? {
453            BlockPosition::NextExpected => {
454                self.state = SynchronizerState::Ready(latest_retrieved.clone());
455                trace!(
456                    next = %latest_retrieved,
457                    extractor = %extractor_id,
458                    "SynchronizerStream transition to next expected"
459                )
460            }
461            BlockPosition::Latest | BlockPosition::Delayed => {
462                if !self.check_and_transition_to_stale_if_needed(
463                    stale_threshold,
464                    Some(latest_retrieved.clone()),
465                )? {
466                    warn!(
467                        %extractor_id,
468                        ?last_message_at,
469                        %block,
470                        "SynchronizerStream transition transition to delayed."
471                    );
472                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
473                }
474            }
475            BlockPosition::Advanced => {
476                info!(
477                    %extractor_id,
478                    ?last_message_at,
479                    latest = opt(&block_history.latest()),
480                    %block,
481                    "SynchronizerStream transition to advanced."
482                );
483                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
484            }
485        }
486        self.modify_ts = Local::now().naive_utc();
487        Ok(())
488    }
489
490    /// Marks this stream as errored
491    ///
492    /// Sets an error and transitions the stream to Ended, correctly recording the
493    /// time at which this happened.
494    fn mark_errored(&mut self, error: SynchronizerError) {
495        self.state = SynchronizerState::Ended(error.to_string());
496        self.modify_ts = Local::now().naive_utc();
497        self.error = Some(error);
498    }
499
500    /// Marks a stream as closed.
501    ///
502    /// If the stream has not been ended previously, e.g. by an error it will be marked
503    /// as Ended without error. This should not happen since we should stop consuming
504    /// from the stream if an error occured.
505    fn mark_closed(&mut self) {
506        if !matches!(self.state, SynchronizerState::Ended(_)) {
507            self.state = SynchronizerState::Ended("Closed".to_string());
508            self.modify_ts = Local::now().naive_utc();
509        }
510    }
511
512    /// Marks a stream as stale.
513    fn mark_stale(&mut self, header: &BlockHeader) {
514        self.state = SynchronizerState::Stale(header.clone());
515        self.modify_ts = Local::now().naive_utc();
516    }
517
518    /// Marks this stream as ready.
519    fn mark_ready(&mut self, header: &BlockHeader) {
520        self.state = SynchronizerState::Ready(header.clone());
521        self.modify_ts = Local::now().naive_utc();
522    }
523
524    fn has_ended(&self) -> bool {
525        matches!(self.state, SynchronizerState::Ended(_))
526    }
527
528    fn is_stale(&self) -> bool {
529        matches!(self.state, SynchronizerState::Stale(_))
530    }
531
532    fn is_advanced(&self) -> bool {
533        matches!(self.state, SynchronizerState::Advanced(_))
534    }
535
536    /// Gets the streams current header from active streams.
537    ///
538    /// A stream is considered as active unless it has ended or is stale.
539    fn get_current_header(&self) -> Option<&BlockHeader> {
540        match &self.state {
541            SynchronizerState::Ready(b) |
542            SynchronizerState::Delayed(b) |
543            SynchronizerState::Advanced(b) => Some(b),
544            _ => None,
545        }
546    }
547}
548
549#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
550pub struct FeedMessage<H = BlockHeader>
551where
552    H: HeaderLike,
553{
554    pub state_msgs: HashMap<String, StateSyncMessage<H>>,
555    pub sync_states: HashMap<String, SynchronizerState>,
556}
557
558impl<H> FeedMessage<H>
559where
560    H: HeaderLike,
561{
562    fn new(
563        state_msgs: HashMap<String, StateSyncMessage<H>>,
564        sync_states: HashMap<String, SynchronizerState>,
565    ) -> Self {
566        Self { state_msgs, sync_states }
567    }
568}
569
570impl<S> BlockSynchronizer<S>
571where
572    S: StateSynchronizer,
573{
574    pub fn new(
575        block_time: std::time::Duration,
576        latency_buffer: std::time::Duration,
577        max_missed_blocks: u64,
578    ) -> Self {
579        Self {
580            synchronizers: None,
581            max_messages: None,
582            block_time,
583            latency_buffer,
584            startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
585            max_missed_blocks,
586        }
587    }
588
589    /// Limits the stream to emit a maximum number of messages.
590    ///
591    /// After the stream emitted max messages it will end. This is only useful for
592    /// testing purposes or if you only want to process a fixed amount of messages
593    /// and then terminate cleanly.
594    pub fn max_messages(&mut self, val: usize) {
595        self.max_messages = Some(val);
596    }
597
598    /// Sets timeout for the first message of a protocol.
599    ///
600    /// Time to wait for the full first message, including snapshot retrieval.
601    pub fn startup_timeout(mut self, val: Duration) {
602        self.startup_timeout = val;
603    }
604
605    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
606        let mut registered = self.synchronizers.unwrap_or_default();
607        registered.insert(id, synchronizer);
608        self.synchronizers = Some(registered);
609        self
610    }
611
612    #[cfg(test)]
613    pub fn with_short_timeouts() -> Self {
614        Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
615    }
616
617    /// Cleanup function for shutting down remaining synchronizers when the nanny detects an error.
618    /// Sends close signals to all remaining synchronizers and waits for them to complete.
619    async fn cleanup_synchronizers(
620        mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
621        sync_close_senders: Vec<oneshot::Sender<()>>,
622    ) {
623        // Send close signals to all remaining synchronizers
624        for close_sender in sync_close_senders {
625            let _ = close_sender.send(());
626        }
627
628        // Await remaining tasks with timeout
629        let mut completed_tasks = 0;
630        while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
631            completed_tasks += 1;
632        }
633
634        // Warn if any synchronizers timed out during cleanup
635        let remaining_tasks = state_sync_tasks.len();
636        if remaining_tasks > 0 {
637            warn!(
638                completed = completed_tasks,
639                timed_out = remaining_tasks,
640                "Some synchronizers timed out during cleanup and may not have shut down cleanly"
641            );
642        }
643    }
644
645    /// Starts the synchronization of streams.
646    ///
647    /// Will error directly if the startup fails. Once the startup is complete, it will
648    /// communicate any fatal errors through the stream before closing it.
649    pub async fn run(
650        mut self,
651    ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
652    {
653        trace!("Starting BlockSynchronizer...");
654        let state_sync_tasks = FuturesUnordered::new();
655        let mut synchronizers = self
656            .synchronizers
657            .take()
658            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
659        // init synchronizers
660        let init_tasks = synchronizers
661            .values_mut()
662            .map(|s| s.initialize())
663            .collect::<Vec<_>>();
664        try_join_all(init_tasks).await?;
665
666        let mut sync_streams = HashMap::with_capacity(synchronizers.len());
667        let mut sync_close_senders = Vec::new();
668        for (extractor_id, synchronizer) in synchronizers.drain() {
669            let (handle, rx) = synchronizer.start().await;
670            let (join_handle, close_sender) = handle.split();
671            state_sync_tasks.push(join_handle);
672            sync_close_senders.push(close_sender);
673
674            sync_streams.insert(extractor_id.clone(), SynchronizerStream::new(&extractor_id, rx));
675        }
676
677        // startup, schedule first set of futures and wait for them to return to initialise
678        // synchronizers.
679        debug!("Waiting for initial synchronizer messages...");
680        let mut startup_futures = Vec::new();
681        for (id, sh) in sync_streams.iter_mut() {
682            let fut = async {
683                let res = timeout(self.startup_timeout, sh.rx.recv()).await;
684                (id.clone(), res)
685            };
686            startup_futures.push(fut);
687        }
688        let mut ready_sync_msgs = HashMap::new();
689        let initial_headers = join_all(startup_futures)
690            .await
691            .into_iter()
692            .filter_map(|(extractor_id, res)| {
693                let synchronizer = sync_streams
694                .get_mut(&extractor_id)
695                .unwrap();
696            match res {
697                Ok(Some(Ok(msg))) => {
698                    debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
699                    // initially default all synchronizers to Ready
700                    synchronizer.mark_ready(&msg.header);
701                    ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
702                    Some(msg.header)
703                }
704                Ok(Some(Err(e))) => {
705                    synchronizer.mark_errored(e);
706                    None
707                }
708                Ok(None) => {
709                    // Synchronizer closed channel. This can only happen if the run
710                    // task ended, before this, the synchronizer should have sent
711                    // an error, so this case we likely don't have to handle that
712                    // explicitly
713                    warn!(%extractor_id, "Synchronizer closed during startup");
714                    synchronizer.mark_closed();
715                    None
716                }
717                Err(_) => {
718                    // We got an error because the synchronizer timed out during startup
719                    warn!(%extractor_id, "Timed out waiting for first message");
720                    synchronizer.mark_stale(&BlockHeader::default());
721                    None
722                }
723            }
724        })
725        .collect::<HashSet<_>>() // remove duplicates
726        .into_iter()
727        .collect::<Vec<_>>();
728
729        // Ensures we have at least one ready stream
730        Self::check_streams(&sync_streams)?;
731        let mut block_history = BlockHistory::new(initial_headers, 15)?;
732        // Determine the starting header for synchronization
733        let start_header = block_history
734            .latest()
735            .expect("Safe since we checked streams before");
736        info!(
737            start_block=%start_header,
738            n_healthy=ready_sync_msgs.len(),
739            n_total=sync_streams.len(),
740            "Block synchronization started successfully!"
741        );
742
743        // Determine correct state for each remaining synchronizer, based on their header vs the
744        // latest one
745        for (_, stream) in sync_streams.iter_mut() {
746            if let SynchronizerState::Ready(header) = stream.state.clone() {
747                if header.number < start_header.number {
748                    debug!(
749                        extractor_id=%stream.extractor_id,
750                        synchronizer_block=header.number,
751                        current_block=start_header.number,
752                        "Marking synchronizer as delayed during initialization"
753                    );
754                    stream.state = SynchronizerState::Delayed(header);
755                }
756            }
757        }
758
759        let (sync_tx, sync_rx) = mpsc::channel(30);
760        let main_loop_jh = tokio::spawn(async move {
761            let mut n_iter = 1;
762            loop {
763                // Send retrieved data to receivers.
764                let msg = FeedMessage::new(
765                    std::mem::take(&mut ready_sync_msgs),
766                    sync_streams
767                        .iter()
768                        .map(|(a, b)| (a.name.to_string(), b.state.clone()))
769                        .collect(),
770                );
771                if sync_tx.send(Ok(msg)).await.is_err() {
772                    info!("Receiver closed, block synchronizer terminating..");
773                    return;
774                };
775
776                // Check if we have reached the max messages
777                if let Some(max_messages) = self.max_messages {
778                    if n_iter >= max_messages {
779                        info!(max_messages, "StreamEnd");
780                        return;
781                    }
782                }
783                n_iter += 1;
784
785                let res = self
786                    .handle_next_message(
787                        &mut sync_streams,
788                        &mut ready_sync_msgs,
789                        &mut block_history,
790                    )
791                    .await;
792
793                if let Err(e) = res {
794                    // Communicate error to clients, then end the loop
795                    let _ = sync_tx.send(Err(e)).await;
796                    return;
797                }
798            }
799        });
800
801        // We await the main loop and log any panics (should be impossible). If the
802        // main loop exits, all synchronizers should be ended or stale. So we kill any
803        // remaining stale ones just in case. A final error is propagated through the
804        // channel to the user.
805        let nanny_jh = tokio::spawn(async move {
806            // report any panics
807            let _ = main_loop_jh.await.map_err(|e| {
808                if e.is_panic() {
809                    error!("BlockSynchornizer main loop panicked: {e}")
810                }
811            });
812            debug!("Main loop exited. Closing synchronizers");
813            Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
814            debug!("Shutdown complete");
815        });
816        Ok((nanny_jh, sync_rx))
817    }
818
819    /// Retrieves next message from synchronizers
820    ///
821    /// The result is written into `ready_sync_messages`. Errors only if there is a
822    /// non-recoverable error or all synchronizers have ended.
823    async fn handle_next_message(
824        &self,
825        sync_streams: &mut HashMap<ExtractorIdentity, SynchronizerStream>,
826        ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
827        block_history: &mut BlockHistory,
828    ) -> BlockSyncResult<()> {
829        let mut recv_futures = Vec::new();
830        for (extractor_id, stream) in sync_streams.iter_mut() {
831            // If stream is in ended state, do not check for any messages (it's receiver
832            // is closed), but do check stale streams.
833            if stream.has_ended() {
834                continue
835            }
836            // Here we simply wait block_time + max_wait. This will not work for chains with
837            // unknown block times but is simple enough for now.
838            // If we would like to support unknown block times we could: Instruct all handles to
839            // await the max block time, if a header arrives within that time transition as
840            // usual, but via a select statement get notified (using e.g. Notify) if any other
841            // handle finishes before the timeout. Then await again but this time only for
842            // max_wait and then proceed as usual. So basically each try_advance task would have
843            // a select statement that allows it to exit the first timeout preemptively if any
844            // other try_advance task finished earlier.
845            recv_futures.push(async {
846                let res = stream
847                    .try_advance(
848                        block_history,
849                        self.block_time,
850                        self.latency_buffer,
851                        self.block_time
852                            .mul_f64(self.max_missed_blocks as f64),
853                    )
854                    .await?;
855                Ok::<_, BlockSynchronizerError>(res.map(|msg| (extractor_id.name.clone(), msg)))
856            });
857        }
858        ready_sync_msgs.extend(
859            join_all(recv_futures)
860                .await
861                .into_iter()
862                .collect::<Result<Vec<_>, _>>()?
863                .into_iter()
864                .flatten(),
865        );
866
867        // Check if we have any active synchronizers (Ready, Delayed, or Advanced)
868        // If all synchronizers have been purged (Stale/Ended), exit the main loop
869        Self::check_streams(sync_streams)?;
870
871        // if we have any advanced header, we reinit the block history,
872        // else we simply advance the existing history
873        if sync_streams
874            .values()
875            .any(SynchronizerStream::is_advanced)
876        {
877            *block_history = Self::reinit_block_history(sync_streams, block_history)?;
878        } else {
879            let header = sync_streams
880                .values()
881                .filter_map(SynchronizerStream::get_current_header)
882                .max_by_key(|b| b.number)
883                .expect("Active streams are present, since we checked above");
884            block_history.push(header.clone())?;
885        }
886        Ok(())
887    }
888
889    /// Reinitialise block history and reclassifies active synchronizers states.
890    ///
891    /// We call this if we detect a future detached block. This usually only happens if
892    /// a synchronizer has a restart.
893    ///
894    /// ## Note
895    /// This method assumes that at least one synchronizer is in Advanced, Ready or
896    /// Delayed state, it will panic in case this is not the case.
897    fn reinit_block_history(
898        sync_streams: &mut HashMap<ExtractorIdentity, SynchronizerStream>,
899        block_history: &mut BlockHistory,
900    ) -> Result<BlockHistory, BlockSynchronizerError> {
901        let previous = block_history.latest().expect(
902            "Old block history should not be empty, startup should have populated it at this point",
903        );
904        let blocks = sync_streams
905            .values()
906            .filter_map(SynchronizerStream::get_current_header)
907            .cloned()
908            .collect();
909        let new_block_history = BlockHistory::new(blocks, 10)?;
910        let latest = block_history
911            .latest()
912            .expect("Block history should not be empty, we just populated it.");
913        info!(
914             %previous,
915            %latest,
916            "Advanced synchronizer detected. Reinitialized block history."
917        );
918        sync_streams
919            .values_mut()
920            .for_each(|stream| {
921                // we only get headers from advanced, ready and delayed so stale
922                // or ended streams are not considered here
923                if let Some(header) = stream.get_current_header() {
924                    if header.number < latest.number {
925                        stream.state = SynchronizerState::Delayed(header.clone());
926                    } else if header.number == latest.number {
927                        stream.state = SynchronizerState::Ready(header.clone());
928                    }
929                }
930            });
931        Ok(new_block_history)
932    }
933
934    /// Checks if we still have at least one active stream else errors.
935    ///
936    /// If there are no active streams meaning all are ended or stale, it returns a
937    /// summary error message for the state of all synchronizers.
938    fn check_streams(
939        sync_streams: &HashMap<ExtractorIdentity, SynchronizerStream>,
940    ) -> BlockSyncResult<()> {
941        if sync_streams
942            .values()
943            .all(|stream| stream.has_ended() | stream.is_stale())
944        {
945            let mut reason = Vec::new();
946            if let Some((last_errored_id, last_errored_stream)) = sync_streams
947                .iter()
948                .filter(|(_, stream)| stream.has_ended() | stream.is_stale())
949                .max_by_key(|(_, stream)| stream.modify_ts)
950            {
951                if let Some(err) = &last_errored_stream.error {
952                    // All synchronizers were errored/stale and the last one errored
953                    reason.push(format!("Synchronizer for {last_errored_id} errored with: {err}"))
954                } else {
955                    // All synchronizer were errored/stale and the last one also became stale
956                    reason.push(format!(
957                        "Synchronizer for {last_errored_id} became: {}",
958                        last_errored_stream.state
959                    ))
960                }
961            } else {
962                reason.push(
963                    "Can't identify protocol that caused the stream to end! \
964                    This condition should be unreachable!"
965                        .to_string(),
966                )
967            }
968
969            sync_streams
970                .iter()
971                .for_each(|(id, stream)| {
972                    reason
973                        .push(format!("{id} reported as {} at {}", stream.state, stream.modify_ts))
974                });
975
976            return Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")));
977        }
978        Ok(())
979    }
980}
981
982#[cfg(test)]
983mod tests {
984    use std::sync::Arc;
985
986    use async_trait::async_trait;
987    use test_log::test;
988    use tokio::sync::{oneshot, Mutex};
989    use tycho_common::dto::Chain;
990
991    use super::*;
992    use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
993
994    #[derive(Clone, Debug)]
995    enum MockBehavior {
996        Normal,          // Exit successfully when receiving close signal
997        IgnoreClose,     // Ignore close signals and hang (for timeout testing)
998        ExitImmediately, // Exit immediately after first message (for quick failure testing)
999    }
1000
1001    type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1002
1003    #[derive(Clone)]
1004    struct MockStateSync {
1005        header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1006        header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1007        close_received: Arc<Mutex<bool>>,
1008        behavior: MockBehavior,
1009        // For testing: store the close sender so tests can trigger close signals
1010        close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1011    }
1012
1013    impl MockStateSync {
1014        fn new() -> Self {
1015            Self::with_behavior(MockBehavior::Normal)
1016        }
1017
1018        fn with_behavior(behavior: MockBehavior) -> Self {
1019            let (tx, rx) = mpsc::channel(1);
1020            Self {
1021                header_tx: tx,
1022                header_rx: Arc::new(Mutex::new(Some(rx))),
1023                close_received: Arc::new(Mutex::new(false)),
1024                behavior,
1025                close_tx: Arc::new(Mutex::new(None)),
1026            }
1027        }
1028
1029        async fn was_close_received(&self) -> bool {
1030            *self.close_received.lock().await
1031        }
1032
1033        async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1034            self.header_tx
1035                .send(Ok(header))
1036                .await
1037                .map_err(|e| format!("sending header failed: {e}"))
1038        }
1039
1040        // For testing: trigger a close signal to make the synchronizer exit
1041        async fn trigger_close(&self) {
1042            if let Some(close_tx) = self.close_tx.lock().await.take() {
1043                let _ = close_tx.send(());
1044            }
1045        }
1046    }
1047
1048    #[async_trait]
1049    impl StateSynchronizer for MockStateSync {
1050        async fn initialize(&mut self) -> SyncResult<()> {
1051            Ok(())
1052        }
1053
1054        async fn start(
1055            mut self,
1056        ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1057            let block_rx = {
1058                let mut guard = self.header_rx.lock().await;
1059                guard
1060                    .take()
1061                    .expect("Block receiver was not set!")
1062            };
1063
1064            // Create close channel - we need to store one sender for testing and give one to the
1065            // handle
1066            let (close_tx_for_handle, close_rx) = oneshot::channel();
1067            let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1068
1069            // Store the test close sender
1070            {
1071                let mut guard = self.close_tx.lock().await;
1072                *guard = Some(close_tx_for_test);
1073            }
1074
1075            let behavior = self.behavior.clone();
1076            let close_received_clone = self.close_received.clone();
1077            let tx = self.header_tx.clone();
1078
1079            let jh = tokio::spawn(async move {
1080                match behavior {
1081                    MockBehavior::IgnoreClose => {
1082                        // Infinite loop to simulate a hung synchronizer that doesn't respond to
1083                        // close signals
1084                        loop {
1085                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1086                        }
1087                    }
1088                    MockBehavior::ExitImmediately => {
1089                        // Exit immediately with error to simulate immediate task failure
1090                        tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1091                            "Simulated immediate task failure".to_string(),
1092                        )))
1093                        .await
1094                        .unwrap();
1095                    }
1096                    MockBehavior::Normal => {
1097                        // Wait for close signal from either handle or test, then respond based on
1098                        // behavior
1099                        let _ = tokio::select! {
1100                            result = close_rx => result,
1101                            result = close_rx_for_test => result,
1102                        };
1103                        let mut guard = close_received_clone.lock().await;
1104                        *guard = true;
1105                    }
1106                }
1107            });
1108
1109            let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1110            (handle, block_rx)
1111        }
1112    }
1113
1114    fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1115        StateSyncMessage {
1116            header: BlockHeader {
1117                number: block as u64,
1118                hash: Bytes::from(vec![block]),
1119                parent_hash: Bytes::from(vec![block - 1]),
1120                revert: false,
1121                timestamp: 1000,
1122            },
1123            ..Default::default()
1124        }
1125    }
1126
1127    async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1128        timeout(Duration::from_millis(100), rx.recv())
1129            .await
1130            .expect("Responds in time")
1131            .expect("Should receive first message")
1132            .expect("No error")
1133    }
1134
1135    async fn setup_block_sync(
1136    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1137    {
1138        setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1139    }
1140
1141    // Starts up a synchronizer and consumes the first message on block 1.
1142    async fn setup_block_sync_with_behaviour(
1143        v2_behavior: MockBehavior,
1144        v3_behavior: MockBehavior,
1145    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1146    {
1147        let v2_sync = MockStateSync::with_behavior(v2_behavior);
1148        let v3_sync = MockStateSync::with_behavior(v3_behavior);
1149
1150        // Use reasonable timeouts to observe proper state transitions
1151        let mut block_sync = BlockSynchronizer::new(
1152            Duration::from_millis(20), // block_time
1153            Duration::from_millis(10), // max_wait
1154            3,                         // max_missed_blocks (stale threshold = 20ms * 3 = 60ms)
1155        );
1156        block_sync.max_messages(10); // Allow enough messages to see the progression
1157
1158        let block_sync = block_sync
1159            .register_synchronizer(
1160                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1161                v2_sync.clone(),
1162            )
1163            .register_synchronizer(
1164                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1165                v3_sync.clone(),
1166            );
1167
1168        // Send initial messages to both synchronizers
1169        let block1_msg = header_message(1);
1170        let _ = v2_sync
1171            .send_header(block1_msg.clone())
1172            .await;
1173        let _ = v3_sync
1174            .send_header(block1_msg.clone())
1175            .await;
1176
1177        // Start the block synchronizer
1178        let (nanny_handle, mut rx) = block_sync
1179            .run()
1180            .await
1181            .expect("BlockSynchronizer failed to start");
1182
1183        let first_feed_msg = receive_message(&mut rx).await;
1184        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1185        assert!(matches!(
1186            first_feed_msg
1187                .sync_states
1188                .get("uniswap-v2")
1189                .unwrap(),
1190            SynchronizerState::Ready(_)
1191        ));
1192        assert!(matches!(
1193            first_feed_msg
1194                .sync_states
1195                .get("uniswap-v3")
1196                .unwrap(),
1197            SynchronizerState::Ready(_)
1198        ));
1199
1200        (v2_sync, v3_sync, nanny_handle, rx)
1201    }
1202
1203    async fn shutdown_block_synchronizer(
1204        v2_sync: &MockStateSync,
1205        v3_sync: &MockStateSync,
1206        nanny_handle: JoinHandle<()>,
1207    ) {
1208        v3_sync.trigger_close().await;
1209        v2_sync.trigger_close().await;
1210
1211        timeout(Duration::from_millis(100), nanny_handle)
1212            .await
1213            .expect("Nanny failed to exit within time")
1214            .expect("Nanny panicked");
1215    }
1216
1217    #[test(tokio::test)]
1218    async fn test_two_ready_synchronizers() {
1219        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1220
1221        let second_msg = header_message(2);
1222        v2_sync
1223            .send_header(second_msg.clone())
1224            .await
1225            .expect("send_header failed");
1226        v3_sync
1227            .send_header(second_msg.clone())
1228            .await
1229            .expect("send_header failed");
1230        let second_feed_msg = receive_message(&mut rx).await;
1231
1232        let exp2 = FeedMessage {
1233            state_msgs: [
1234                ("uniswap-v2".to_string(), second_msg.clone()),
1235                ("uniswap-v3".to_string(), second_msg.clone()),
1236            ]
1237            .into_iter()
1238            .collect(),
1239            sync_states: [
1240                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1241                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1242            ]
1243            .into_iter()
1244            .collect(),
1245        };
1246        assert_eq!(second_feed_msg, exp2);
1247
1248        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1249    }
1250
1251    #[test(tokio::test)]
1252    async fn test_delayed_synchronizer_catches_up() {
1253        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1254
1255        // Send block 2 to v2 synchronizer only
1256        let block2_msg = header_message(2);
1257        v2_sync
1258            .send_header(block2_msg.clone())
1259            .await
1260            .expect("send_header failed");
1261
1262        // Consume second message - v3 should be delayed
1263        let second_feed_msg = receive_message(&mut rx).await;
1264        debug!("Consumed second message for v2");
1265
1266        assert!(second_feed_msg
1267            .state_msgs
1268            .contains_key("uniswap-v2"));
1269        assert!(matches!(
1270            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1271            SynchronizerState::Ready(header) if header.number == 2
1272        ));
1273        assert!(!second_feed_msg
1274            .state_msgs
1275            .contains_key("uniswap-v3"));
1276        assert!(matches!(
1277            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1278            SynchronizerState::Delayed(header) if header.number == 1
1279        ));
1280
1281        // Now v3 catches up to block 2
1282        v3_sync
1283            .send_header(block2_msg.clone())
1284            .await
1285            .expect("send_header failed");
1286
1287        // Both advance to block 3
1288        let block3_msg = header_message(3);
1289        v2_sync
1290            .send_header(block3_msg.clone())
1291            .await
1292            .expect("send_header failed");
1293        v3_sync
1294            .send_header(block3_msg)
1295            .await
1296            .expect("send_header failed");
1297
1298        // Consume messages until we get both synchronizers on block 3
1299        // We may get an intermediate message for v3's catch-up or a combined message
1300        let mut third_feed_msg = receive_message(&mut rx).await;
1301
1302        // If this message doesn't have both univ2, it's an intermediate message, so we get the next
1303        // one
1304        if !third_feed_msg
1305            .state_msgs
1306            .contains_key("uniswap-v2")
1307        {
1308            third_feed_msg = rx
1309                .recv()
1310                .await
1311                .expect("header channel was closed")
1312                .expect("no error");
1313        }
1314        assert!(third_feed_msg
1315            .state_msgs
1316            .contains_key("uniswap-v2"));
1317        assert!(third_feed_msg
1318            .state_msgs
1319            .contains_key("uniswap-v3"));
1320        assert!(matches!(
1321            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1322            SynchronizerState::Ready(header) if header.number == 3
1323        ));
1324        assert!(matches!(
1325            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1326            SynchronizerState::Ready(header) if header.number == 3
1327        ));
1328
1329        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1330    }
1331
1332    #[test(tokio::test)]
1333    async fn test_different_start_blocks() {
1334        let v2_sync = MockStateSync::new();
1335        let v3_sync = MockStateSync::new();
1336        let block_sync = BlockSynchronizer::with_short_timeouts()
1337            .register_synchronizer(
1338                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1339                v2_sync.clone(),
1340            )
1341            .register_synchronizer(
1342                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1343                v3_sync.clone(),
1344            );
1345
1346        // Initial messages - synchronizers at different blocks
1347        let block1_msg = header_message(1);
1348        let block2_msg = header_message(2);
1349
1350        let _ = v2_sync
1351            .send_header(block1_msg.clone())
1352            .await;
1353        v3_sync
1354            .send_header(block2_msg.clone())
1355            .await
1356            .expect("send_header failed");
1357
1358        // Start the block synchronizer - it should use block 2 as the starting block
1359        let (jh, mut rx) = block_sync
1360            .run()
1361            .await
1362            .expect("BlockSynchronizer failed to start.");
1363
1364        // Consume first message
1365        let first_feed_msg = receive_message(&mut rx).await;
1366        assert!(matches!(
1367            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1368            SynchronizerState::Delayed(header) if header.number == 1
1369        ));
1370        assert!(matches!(
1371            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1372            SynchronizerState::Ready(header) if header.number == 2
1373        ));
1374
1375        // Now v2 catches up to block 2
1376        v2_sync
1377            .send_header(block2_msg.clone())
1378            .await
1379            .expect("send_header failed");
1380
1381        // Both advance to block 3
1382        let block3_msg = header_message(3);
1383        let _ = v2_sync
1384            .send_header(block3_msg.clone())
1385            .await;
1386        v3_sync
1387            .send_header(block3_msg.clone())
1388            .await
1389            .expect("send_header failed");
1390
1391        // Consume third message - both should be on block 3
1392        let second_feed_msg = receive_message(&mut rx).await;
1393        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1394        assert!(matches!(
1395            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1396            SynchronizerState::Ready(header) if header.number == 3
1397        ));
1398        assert!(matches!(
1399            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1400            SynchronizerState::Ready(header) if header.number == 3
1401        ));
1402
1403        shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
1404    }
1405
1406    #[test(tokio::test)]
1407    async fn test_synchronizer_fails_other_goes_stale() {
1408        let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1409            setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1410                .await;
1411
1412        let mut error_reported = false;
1413        for _ in 0..3 {
1414            if let Some(msg) = sync_rx.recv().await {
1415                match msg {
1416                    Err(_) => error_reported = true,
1417                    Ok(msg) => {
1418                        assert!(matches!(
1419                            msg.sync_states
1420                                .get("uniswap-v3")
1421                                .unwrap(),
1422                            SynchronizerState::Delayed(_)
1423                        ));
1424                        assert!(matches!(
1425                            msg.sync_states
1426                                .get("uniswap-v2")
1427                                .unwrap(),
1428                            SynchronizerState::Ended(_)
1429                        ));
1430                    }
1431                }
1432            }
1433        }
1434        assert!(error_reported, "BlockSynchronizer did not report final error");
1435
1436        // Wait for nanny to detect task failure and execute cleanup
1437        let result = timeout(Duration::from_secs(2), nanny_handle).await;
1438        assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1439
1440        // Verify that the remaining synchronizer received close signal during cleanup
1441        assert!(
1442            v3_sync.was_close_received().await,
1443            "v3_sync should have received close signal during cleanup"
1444        );
1445    }
1446
1447    #[test(tokio::test)]
1448    async fn test_cleanup_timeout_warning() {
1449        // Verify that cleanup_synchronizers emits a warning when synchronizers timeout during
1450        // cleanup
1451        let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1452            MockBehavior::ExitImmediately,
1453            MockBehavior::IgnoreClose,
1454        )
1455        .await;
1456
1457        // Wait for nanny to complete - cleanup should timeout on v3_sync but still complete
1458        let result = timeout(Duration::from_secs(10), nanny_handle).await;
1459        assert!(
1460            result.is_ok(),
1461            "Nanny should complete even when some synchronizers timeout during cleanup"
1462        );
1463
1464        // Note: In a real test environment, we would capture log output to verify the warning was
1465        // emitted. Since this is a unit test without log capture setup, we just verify that
1466        // cleanup completes even when some synchronizers timeout.
1467    }
1468
1469    #[test(tokio::test)]
1470    async fn test_one_synchronizer_goes_stale_while_other_works() {
1471        // Test Case 1: One protocol goes stale and is removed while another protocol works normally
1472        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1473
1474        // Send block 2 only to v3, v2 will timeout and become delayed
1475        let block2_msg = header_message(2);
1476        let _ = v3_sync
1477            .send_header(block2_msg.clone())
1478            .await;
1479        // Don't send to v2_sync - it will timeout
1480
1481        // Consume second message - v2 should be delayed, v3 ready
1482        let second_feed_msg = receive_message(&mut rx).await;
1483        assert!(second_feed_msg
1484            .state_msgs
1485            .contains_key("uniswap-v3"));
1486        assert!(!second_feed_msg
1487            .state_msgs
1488            .contains_key("uniswap-v2"));
1489        assert!(matches!(
1490            second_feed_msg
1491                .sync_states
1492                .get("uniswap-v3")
1493                .unwrap(),
1494            SynchronizerState::Ready(_)
1495        ));
1496        // v2 should be delayed (if still present) - check nanny is still running
1497        if let Some(v2_state) = second_feed_msg
1498            .sync_states
1499            .get("uniswap-v2")
1500        {
1501            if matches!(v2_state, SynchronizerState::Delayed(_)) {
1502                // Verify nanny is still running when synchronizer is just delayed
1503                assert!(
1504                    !nanny_handle.is_finished(),
1505                    "Nanny should still be running when synchronizer is delayed (not stale yet)"
1506                );
1507            }
1508        }
1509
1510        // Wait a bit, then continue sending blocks to v3 but not v2
1511        tokio::time::sleep(Duration::from_millis(15)).await;
1512
1513        // Continue sending blocks only to v3 to keep it healthy while v2 goes stale
1514        let block3_msg = header_message(3);
1515        let _ = v3_sync
1516            .send_header(block3_msg.clone())
1517            .await;
1518
1519        tokio::time::sleep(Duration::from_millis(40)).await;
1520
1521        let mut stale_found = false;
1522        for _ in 0..2 {
1523            if let Some(Ok(msg)) = rx.recv().await {
1524                if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1525                    stale_found = true;
1526                }
1527            }
1528        }
1529        assert!(stale_found, "v2 synchronizer should be stale");
1530
1531        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1532    }
1533
1534    #[test(tokio::test)]
1535    async fn test_all_synchronizers_go_stale_main_loop_exits() {
1536        // Test Case 2: All protocols go stale and main loop exits gracefully
1537        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1538
1539        // Stop sending messages to both synchronizers - they should both timeout and go stale
1540        // Don't send any more messages, let them timeout and become delayed, then stale
1541
1542        // Monitor the state transitions to ensure proper delayed -> stale progression
1543        let mut seen_delayed = false;
1544
1545        // Consume messages and track state transitions
1546        // Give enough time for the synchronizers to transition through states
1547        let timeout_duration = Duration::from_millis(500); // Generous timeout
1548        let start_time = tokio::time::Instant::now();
1549
1550        while let Ok(Some(Ok(msg))) =
1551            tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1552        {
1553            // Track when synchronizers transition to delayed
1554            if !seen_delayed {
1555                let v2_state = msg.sync_states.get("uniswap-v2");
1556                let v3_state = msg.sync_states.get("uniswap-v3");
1557
1558                if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1559                    matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1560                {
1561                    seen_delayed = true;
1562                    // Verify nanny is still running when synchronizers are just delayed
1563                    assert!(!nanny_handle.is_finished(),
1564                        "Nanny should still be running when synchronizers are delayed (not stale yet)");
1565                    // Once we've seen delayed and verified nanny is running, we can break
1566                    break;
1567                }
1568            }
1569
1570            // Safety timeout to avoid infinite loop
1571            if start_time.elapsed() > timeout_duration {
1572                break;
1573            }
1574        }
1575        // Verify that synchronizers went through proper state transitions
1576        assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1577
1578        let mut error_reported = false;
1579        // Consume any remaining messages until channel closes
1580        while let Some(msg) = rx.recv().await {
1581            if let Err(e) = msg {
1582                assert!(e
1583                    .to_string()
1584                    .contains("became: Stale(1)"));
1585                assert!(e
1586                    .to_string()
1587                    .contains("reported as Stale(1)"));
1588                error_reported = true;
1589            }
1590        }
1591        assert!(error_reported, "Expected the channel to report an error before closing");
1592
1593        // The nanny should complete when the main loop exits due to no ready synchronizers
1594        let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1595        assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1596
1597        // Verify cleanup was triggered for both synchronizers
1598        assert!(
1599            v2_sync.was_close_received().await,
1600            "v2_sync should have received close signal during cleanup"
1601        );
1602        assert!(
1603            v3_sync.was_close_received().await,
1604            "v3_sync should have received close signal during cleanup"
1605        );
1606    }
1607
1608    #[test(tokio::test)]
1609    async fn test_stale_synchronizer_recovers() {
1610        // Test Case 2: All protocols go stale and main loop exits gracefully
1611        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1612
1613        // Send second messages to v2 only, shortly before both would go stale
1614        tokio::time::sleep(Duration::from_millis(50)).await;
1615        let block2_msg = header_message(2);
1616        let _ = v2_sync
1617            .send_header(block2_msg.clone())
1618            .await;
1619
1620        // we should get two messages here
1621        for _ in 0..2 {
1622            if let Some(msg) = rx.recv().await {
1623                if let Ok(msg) = msg {
1624                    if matches!(
1625                        msg.sync_states
1626                            .get("uniswap-v2")
1627                            .unwrap(),
1628                        SynchronizerState::Ready(_)
1629                    ) {
1630                        assert!(matches!(
1631                            msg.sync_states
1632                                .get("uniswap-v3")
1633                                .unwrap(),
1634                            SynchronizerState::Delayed(_)
1635                        ));
1636                        break;
1637                    };
1638                }
1639            } else {
1640                panic!("Channel closed unexpectedly")
1641            }
1642        }
1643
1644        // Now v3 should be stale
1645        tokio::time::sleep(Duration::from_millis(15)).await;
1646        let block3_msg = header_message(3);
1647        let _ = v2_sync
1648            .send_header(block3_msg.clone())
1649            .await;
1650        let third_msg = receive_message(&mut rx).await;
1651        dbg!(&third_msg);
1652        assert!(matches!(
1653            third_msg
1654                .sync_states
1655                .get("uniswap-v2")
1656                .unwrap(),
1657            SynchronizerState::Ready(_)
1658        ));
1659        assert!(matches!(
1660            third_msg
1661                .sync_states
1662                .get("uniswap-v3")
1663                .unwrap(),
1664            SynchronizerState::Stale(_)
1665        ));
1666
1667        let block4_msg = header_message(4);
1668        let _ = v3_sync
1669            .send_header(block2_msg.clone())
1670            .await;
1671        let _ = v3_sync
1672            .send_header(block3_msg.clone())
1673            .await;
1674        let _ = v3_sync
1675            .send_header(block4_msg.clone())
1676            .await;
1677        let _ = v2_sync
1678            .send_header(block4_msg.clone())
1679            .await;
1680        let fourth_msg = receive_message(&mut rx).await;
1681        assert!(matches!(
1682            fourth_msg
1683                .sync_states
1684                .get("uniswap-v2")
1685                .unwrap(),
1686            SynchronizerState::Ready(_)
1687        ));
1688        assert!(matches!(
1689            fourth_msg
1690                .sync_states
1691                .get("uniswap-v3")
1692                .unwrap(),
1693            SynchronizerState::Ready(_)
1694        ));
1695
1696        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1697
1698        // Verify cleanup was triggered for both synchronizers
1699        assert!(
1700            v2_sync.was_close_received().await,
1701            "v2_sync should have received close signal during cleanup"
1702        );
1703        assert!(
1704            v3_sync.was_close_received().await,
1705            "v3_sync should have received close signal during cleanup"
1706        );
1707    }
1708
1709    #[test(tokio::test)]
1710    async fn test_all_synchronizer_advanced() {
1711        // Test the case were all synchronizers successfully recover but stream
1712        // from a disconnected future block.
1713
1714        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1715
1716        let block3 = header_message(3);
1717        v2_sync
1718            .send_header(block3.clone())
1719            .await
1720            .unwrap();
1721        v3_sync
1722            .send_header(block3)
1723            .await
1724            .unwrap();
1725
1726        let msg = receive_message(&mut rx).await;
1727        matches!(
1728            msg.sync_states
1729                .get("uniswap-v2")
1730                .unwrap(),
1731            SynchronizerState::Ready(_)
1732        );
1733        matches!(
1734            msg.sync_states
1735                .get("uniswap-v3")
1736                .unwrap(),
1737            SynchronizerState::Ready(_)
1738        );
1739
1740        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1741    }
1742
1743    #[test(tokio::test)]
1744    async fn test_one_synchronizer_advanced() {
1745        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1746
1747        let block2 = header_message(2);
1748        let block4 = header_message(4);
1749        v2_sync
1750            .send_header(block4.clone())
1751            .await
1752            .unwrap();
1753        v3_sync
1754            .send_header(block2.clone())
1755            .await
1756            .unwrap();
1757
1758        let msg = receive_message(&mut rx).await;
1759        matches!(
1760            msg.sync_states
1761                .get("uniswap-v2")
1762                .unwrap(),
1763            SynchronizerState::Ready(_)
1764        );
1765        matches!(
1766            msg.sync_states
1767                .get("uniswap-v3")
1768                .unwrap(),
1769            SynchronizerState::Delayed(_)
1770        );
1771
1772        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1773    }
1774}