Skip to main content

tycho_client/feed/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::{Display, Formatter},
4    time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{future::join_all, stream::FuturesUnordered, FutureExt, StreamExt};
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11use tokio::{
12    sync::{
13        mpsc::{self, Receiver},
14        oneshot,
15    },
16    task::JoinHandle,
17    time::timeout,
18};
19use tracing::{debug, error, info, trace, warn};
20use tycho_common::{
21    display::opt,
22    models::{blockchain::BlockAggregatedChanges, ExtractorIdentity},
23    Bytes,
24};
25
26use crate::feed::{
27    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
28    synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
29};
30
31mod block_history;
32pub mod component_tracker;
33pub mod dto;
34pub mod synchronizer;
35
36/// A trait representing a minimal interface for types that behave like a block header.
37///
38/// This abstraction allows working with either full block headers (`BlockHeader`)
39/// or simplified structures that only provide a timestamp (e.g., for RFQ logic).
40pub trait HeaderLike {
41    fn block(self) -> Option<BlockHeader>;
42    fn block_number_or_timestamp(self) -> u64;
43}
44
45#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
46pub struct BlockHeader {
47    pub hash: Bytes,
48    pub number: u64,
49    pub parent_hash: Bytes,
50    pub revert: bool,
51    pub timestamp: u64,
52    /// Index of a partial block update within a block. None for full blocks.
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub partial_block_index: Option<u32>,
55}
56
57impl BlockHeader {
58    fn is_partial(&self) -> bool {
59        self.partial_block_index.is_some()
60    }
61}
62
63impl Display for BlockHeader {
64    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65        // Take first 6 hex chars of the hash for readability
66        let short_hash = if self.hash.len() >= 4 {
67            hex::encode(&self.hash[..4]) // 4 bytes → 8 hex chars
68        } else {
69            hex::encode(&self.hash)
70        };
71
72        match self.partial_block_index {
73            Some(idx) => write!(f, "Block #{} [0x{}..] (partial {})", self.number, short_hash, idx),
74            None => write!(f, "Block #{} [0x{}..]", self.number, short_hash),
75        }
76    }
77}
78
79impl From<&BlockAggregatedChanges> for BlockHeader {
80    fn from(block_changes: &BlockAggregatedChanges) -> Self {
81        let block = &block_changes.block;
82        Self {
83            hash: block.hash.clone(),
84            number: block.number,
85            parent_hash: block.parent_hash.clone(),
86            revert: block_changes.revert,
87            timestamp: block.ts.and_utc().timestamp() as u64,
88            partial_block_index: block_changes.partial_block_index,
89        }
90    }
91}
92
93impl HeaderLike for BlockHeader {
94    fn block(self) -> Option<BlockHeader> {
95        Some(self)
96    }
97
98    fn block_number_or_timestamp(self) -> u64 {
99        self.number
100    }
101}
102
103#[derive(Error, Debug)]
104pub enum BlockSynchronizerError {
105    #[error("Failed to initialize extractor '{extractor}': {source}")]
106    InitializationError {
107        extractor: ExtractorIdentity,
108        #[source]
109        source: SynchronizerError,
110    },
111
112    #[error("Failed to process new block: {0}")]
113    BlockHistoryError(#[from] BlockHistoryError),
114
115    #[error("Not a single synchronizer was ready: {0}")]
116    NoReadySynchronizers(String),
117
118    #[error("No synchronizers were set")]
119    NoSynchronizers,
120
121    #[error("Failed to convert duration: {0}")]
122    DurationConversionError(String),
123}
124
125type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
126
127/// Aligns multiple StateSynchronizers on the block dimension.
128///
129/// ## Purpose
130/// The purpose of this component is to handle streams from multiple state synchronizers and
131/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
132/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
133/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
134/// included again once we reach the block it is at.
135///
136/// ## Limitations
137/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
138///
139/// ## Initialisation
140/// Queries all registered synchronizers for their first message and evaluates the state of each
141/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
142// TODO: what is the startup timeout
143/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
144/// closed.
145///
146/// ## Main loop
147/// Once started, the synchronizers are queried concurrently for messages in lock step:
148/// the main loop queries all synchronizers in ready for the last emitted data, builds the
149/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
150///
151/// ## Synchronization Logic
152///
153/// To classify a synchronizer as delayed, we need to first define the current block. The highest
154/// block number of all ready synchronizers is considered the current block.
155///
156/// Once we have the current block we can easily determine which block we expect next. And if a
157/// synchronizer delivers an older block we can classify it as delayed.
158///
159/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
160/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
161/// the height of an advanced synchronizer (and flagging it as such in the meantime).
162///
163/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
164/// within the block production step of the blockchain:
165/// The wait procedure consists of waiting for any of the individual ProtocolStateSynchronizers
166/// to emit a new message (within a max timeout - several multiples of the block time). Once a
167/// message is received a very short timeout starts for the remaining synchronizers, to deliver a
168/// message. Any synchronizer failing to do so is transitioned to delayed.
169///
170/// ### Note
171/// The described process above is the goal. It is currently not implemented like that. Instead we
172/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
173/// timeout. This is simpler but only works well on chains with fixed block times.
174pub struct BlockSynchronizer<S> {
175    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
176    /// Time to wait for a block usually
177    block_time: std::time::Duration,
178    /// Added on top of block time to account for latency
179    latency_buffer: std::time::Duration,
180    /// Time to wait for the full first message, including snapshot retrieval
181    startup_timeout: std::time::Duration,
182    /// Optionally, end the stream after emitting max messages
183    max_messages: Option<usize>,
184    /// Amount of blocks a protocol can be delayed for, before it is considered stale
185    max_missed_blocks: u64,
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
189#[serde(tag = "status", rename_all = "lowercase")]
190pub enum SynchronizerState {
191    /// Initial state, assigned before trying to receive any message
192    Started,
193    /// The synchronizer emitted a message for the block as expected
194    Ready(BlockHeader),
195    /// The synchronizer is on a previous block compared to others and is expected to
196    /// catch up soon.
197    Delayed(BlockHeader),
198    /// The synchronizer hasn't emitted messages for > `max_missed_blocks` or has
199    /// fallen far behind. At this point we do not wait for it anymore but it can
200    /// still eventually recover.
201    Stale(BlockHeader),
202    /// The synchronizer is on future not connected block.
203    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
204    // StateSynchronizer. This can only happen if we are processing too slow and one or all of the
205    // synchronizers restarts e.g. due to websocket connection drops.
206    Advanced(BlockHeader),
207    /// The synchronizer ended with an error.
208    Ended(String),
209}
210
211impl Display for SynchronizerState {
212    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
213        match self {
214            SynchronizerState::Started => write!(f, "Started"),
215            SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
216            SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
217            SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
218            SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
219            SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
220        }
221    }
222}
223
224pub struct SynchronizerStream {
225    extractor_id: ExtractorIdentity,
226    state: SynchronizerState,
227    error: Option<SynchronizerError>,
228    modify_ts: NaiveDateTime,
229    rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
230}
231
232impl SynchronizerStream {
233    fn new(
234        extractor_id: &ExtractorIdentity,
235        rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
236    ) -> Self {
237        Self {
238            extractor_id: extractor_id.clone(),
239            state: SynchronizerState::Started,
240            error: None,
241            modify_ts: Local::now().naive_utc(),
242            rx,
243        }
244    }
245    async fn try_advance(
246        &mut self,
247        block_history: &BlockHistory,
248        block_time: std::time::Duration,
249        latency_buffer: std::time::Duration,
250        stale_threshold: std::time::Duration,
251    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
252        let extractor_id = self.extractor_id.clone();
253        let latest_block = block_history.latest();
254
255        match &self.state {
256            SynchronizerState::Started | SynchronizerState::Ended(_) => {
257                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
258                Ok(None)
259            }
260            SynchronizerState::Advanced(b) => {
261                let future_block = b.clone();
262                // Transition to ready once we arrived at the expected height
263                self.transition(future_block, block_history, stale_threshold)?;
264                Ok(None)
265            }
266            SynchronizerState::Ready(previous_block) => {
267                // Try to recv the next expected block, update state accordingly.
268                self.try_recv_next_expected(
269                    block_time + latency_buffer,
270                    block_history,
271                    previous_block.clone(),
272                    stale_threshold,
273                )
274                .await
275            }
276            SynchronizerState::Delayed(old_block) => {
277                // try to catch up all currently queued blocks until the expected block
278                debug!(
279                    %old_block,
280                    latest_block=opt(&latest_block),
281                    %extractor_id,
282                    "Trying to catch up to latest block"
283                );
284                self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
285                    .await
286            }
287            SynchronizerState::Stale(old_block) => {
288                // try to catch up all currently queued blocks until the expected block
289                debug!(
290                    %old_block,
291                    latest_block=opt(&latest_block),
292                    %extractor_id,
293                    "Trying to catch up stale synchronizer to latest block"
294                );
295                self.try_catch_up(block_history, block_time, stale_threshold)
296                    .await
297            }
298        }
299    }
300
301    /// Standard way to advance a well-behaved state synchronizer.
302    ///
303    /// Will wait for a new block on the synchronizer within a timeout. And modify its
304    /// state based on the outcome.
305    ///
306    /// ## Note
307    /// This method assumes that the current state is `Ready`.
308    async fn try_recv_next_expected(
309        &mut self,
310        max_wait: std::time::Duration,
311        block_history: &BlockHistory,
312        previous_block: BlockHeader,
313        stale_threshold: std::time::Duration,
314    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
315        let extractor_id = self.extractor_id.clone();
316        match timeout(max_wait, self.rx.recv()).await {
317            Ok(Some(Ok(msg))) => {
318                self.transition(msg.header.clone(), block_history, stale_threshold)?;
319                Ok(Some(msg))
320            }
321            Ok(Some(Err(e))) => {
322                // The underlying synchronizer exhausted its retries
323                self.mark_errored(e);
324                Ok(None)
325            }
326            Ok(None) => {
327                // This case should not happen, as we shouldn't poll the synchronizer after we
328                // closed it or after it errored.
329                warn!(
330                    %extractor_id,
331                    "Tried to poll from closed synchronizer.",
332                );
333                self.mark_closed();
334                Ok(None)
335            }
336            Err(_) => {
337                // trying to advance a block timed out
338                debug!(%extractor_id, %previous_block, "No block received within time limit.");
339                // No need to consider state since we only call this method if we are
340                // in the Ready state.
341                self.state = SynchronizerState::Delayed(previous_block.clone());
342                self.modify_ts = Local::now().naive_utc();
343                Ok(None)
344            }
345        }
346    }
347
348    /// Tries to catch up a delayed state synchronizer.
349    ///
350    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
351    /// by consuming all waiting messages in its queue and waiting for any new block messages
352    /// within a timeout. Finally, all update messages are merged into one and returned.
353    async fn try_catch_up(
354        &mut self,
355        block_history: &BlockHistory,
356        max_wait: std::time::Duration,
357        stale_threshold: std::time::Duration,
358    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
359        let mut results = Vec::new();
360        let extractor_id = self.extractor_id.clone();
361
362        // Set a deadline for the overall catch-up operation
363        let deadline = std::time::Instant::now() + max_wait;
364
365        while std::time::Instant::now() < deadline {
366            match timeout(
367                deadline.saturating_duration_since(std::time::Instant::now()),
368                self.rx.recv(),
369            )
370            .await
371            {
372                Ok(Some(Ok(msg))) => {
373                    debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
374                    let block_pos = block_history.determine_block_position(&msg.header)?;
375                    results.push(msg);
376                    if matches!(block_pos, BlockPosition::NextExpected | BlockPosition::NextPartial)
377                    {
378                        break;
379                    }
380                }
381                Ok(Some(Err(e))) => {
382                    // Synchronizer errored during catch up
383                    self.mark_errored(e);
384                    return Ok(None);
385                }
386                Ok(None) => {
387                    // This case should not happen, as we shouldn't poll the synchronizer after we
388                    // closed it or after it errored.
389                    warn!(
390                        %extractor_id,
391                        "Tried to poll from closed synchronizer during catch up.",
392                    );
393                    self.mark_closed();
394                    return Ok(None);
395                }
396                Err(_) => {
397                    debug!(%extractor_id, "Timed out waiting for catch-up");
398                    break;
399                }
400            }
401        }
402
403        let merged = results
404            .into_iter()
405            .reduce(|l, r| l.merge(r));
406
407        if let Some(msg) = merged {
408            // we were able to get at least one block out
409            debug!(%extractor_id, "Delayed extractor made progress!");
410            self.transition(msg.header.clone(), block_history, stale_threshold)?;
411            Ok(Some(msg))
412        } else {
413            // No progress made during catch-up, check if we should go stale
414            self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
415            Ok(None)
416        }
417    }
418
419    /// Helper method to check if synchronizer should transition to stale based on time elapsed
420    fn check_and_transition_to_stale_if_needed(
421        &mut self,
422        stale_threshold: std::time::Duration,
423        fallback_header: Option<BlockHeader>,
424    ) -> Result<bool, BlockSynchronizerError> {
425        let now = Local::now().naive_utc();
426        let wait_duration = now.signed_duration_since(self.modify_ts);
427        let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
428            .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
429
430        if wait_duration > stale_threshold_chrono {
431            let header_to_use = match (&self.state, fallback_header) {
432                (SynchronizerState::Ready(h), _) |
433                (SynchronizerState::Delayed(h), _) |
434                (SynchronizerState::Stale(h), _) => h.clone(),
435                (_, Some(h)) => h,
436                _ => BlockHeader::default(),
437            };
438
439            warn!(
440                extractor_id=%self.extractor_id,
441                last_message_at=?self.modify_ts,
442                "SynchronizerStream transition to stale due to timeout."
443            );
444            self.state = SynchronizerState::Stale(header_to_use);
445            self.modify_ts = now;
446            Ok(true)
447        } else {
448            Ok(false)
449        }
450    }
451
452    /// Logic to transition a state synchronizer based on newly received block
453    ///
454    /// Updates the synchronizer's state according to the position of the received block:
455    /// - Next expected block -> Ready state
456    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
457    /// - Advanced block -> Advanced state (block ahead of expected position)
458    fn transition(
459        &mut self,
460        latest_retrieved: BlockHeader,
461        block_history: &BlockHistory,
462        stale_threshold: std::time::Duration,
463    ) -> Result<(), BlockSynchronizerError> {
464        let extractor_id = self.extractor_id.clone();
465        let last_message_at = self.modify_ts;
466        let block = &latest_retrieved;
467
468        match block_history.determine_block_position(&latest_retrieved)? {
469            BlockPosition::NextExpected | BlockPosition::NextPartial => {
470                self.state = SynchronizerState::Ready(latest_retrieved.clone());
471                trace!(
472                    next = %latest_retrieved,
473                    extractor = %extractor_id,
474                    "SynchronizerStream transition to next expected"
475                )
476            }
477            BlockPosition::Latest | BlockPosition::Delayed => {
478                if !self.check_and_transition_to_stale_if_needed(
479                    stale_threshold,
480                    Some(latest_retrieved.clone()),
481                )? {
482                    warn!(
483                        %extractor_id,
484                        ?last_message_at,
485                        %block,
486                        "SynchronizerStream transition transition to delayed."
487                    );
488                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
489                }
490            }
491            BlockPosition::Advanced => {
492                info!(
493                    %extractor_id,
494                    ?last_message_at,
495                    latest = opt(&block_history.latest()),
496                    %block,
497                    "SynchronizerStream transition to advanced."
498                );
499                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
500            }
501        }
502        self.modify_ts = Local::now().naive_utc();
503        Ok(())
504    }
505
506    /// Marks this stream as errored
507    ///
508    /// Sets an error and transitions the stream to Ended, correctly recording the
509    /// time at which this happened.
510    fn mark_errored(&mut self, error: SynchronizerError) {
511        self.state = SynchronizerState::Ended(error.to_string());
512        self.modify_ts = Local::now().naive_utc();
513        self.error = Some(error);
514    }
515
516    /// Marks a stream as closed.
517    ///
518    /// If the stream has not been ended previously, e.g. by an error it will be marked
519    /// as Ended without error. This should not happen since we should stop consuming
520    /// from the stream if an error occured.
521    fn mark_closed(&mut self) {
522        if !matches!(self.state, SynchronizerState::Ended(_)) {
523            self.state = SynchronizerState::Ended("Closed".to_string());
524            self.modify_ts = Local::now().naive_utc();
525        }
526    }
527
528    /// Marks a stream as stale.
529    fn mark_stale(&mut self, header: &BlockHeader) {
530        self.state = SynchronizerState::Stale(header.clone());
531        self.modify_ts = Local::now().naive_utc();
532    }
533
534    /// Marks this stream as ready.
535    fn mark_ready(&mut self, header: &BlockHeader) {
536        self.state = SynchronizerState::Ready(header.clone());
537        self.modify_ts = Local::now().naive_utc();
538    }
539
540    fn has_ended(&self) -> bool {
541        matches!(self.state, SynchronizerState::Ended(_))
542    }
543
544    fn is_stale(&self) -> bool {
545        matches!(self.state, SynchronizerState::Stale(_))
546    }
547
548    fn is_advanced(&self) -> bool {
549        matches!(self.state, SynchronizerState::Advanced(_))
550    }
551
552    /// Gets the streams current header from active streams.
553    ///
554    /// A stream is considered as active unless it has ended or is stale.
555    fn get_current_header(&self) -> Option<&BlockHeader> {
556        match &self.state {
557            SynchronizerState::Ready(b) |
558            SynchronizerState::Delayed(b) |
559            SynchronizerState::Advanced(b) => Some(b),
560            _ => None,
561        }
562    }
563}
564
565#[derive(Debug, PartialEq, Clone)]
566pub struct FeedMessage<H = BlockHeader>
567where
568    H: HeaderLike,
569{
570    pub state_msgs: HashMap<String, StateSyncMessage<H>>,
571    pub sync_states: HashMap<String, SynchronizerState>,
572}
573
574impl<H> FeedMessage<H>
575where
576    H: HeaderLike,
577{
578    fn new(
579        state_msgs: HashMap<String, StateSyncMessage<H>>,
580        sync_states: HashMap<String, SynchronizerState>,
581    ) -> Self {
582        Self { state_msgs, sync_states }
583    }
584}
585
586impl<S> BlockSynchronizer<S>
587where
588    S: StateSynchronizer,
589{
590    pub fn new(
591        block_time: std::time::Duration,
592        latency_buffer: std::time::Duration,
593        max_missed_blocks: u64,
594    ) -> Self {
595        Self {
596            synchronizers: None,
597            max_messages: None,
598            block_time,
599            latency_buffer,
600            startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
601            max_missed_blocks,
602        }
603    }
604
605    /// Limits the stream to emit a maximum number of messages.
606    ///
607    /// After the stream emitted max messages it will end. This is only useful for
608    /// testing purposes or if you only want to process a fixed amount of messages
609    /// and then terminate cleanly.
610    pub fn max_messages(&mut self, val: usize) {
611        self.max_messages = Some(val);
612    }
613
614    /// Sets timeout for the first message of a protocol.
615    ///
616    /// Time to wait for the full first message, including snapshot retrieval.
617    pub fn startup_timeout(mut self, val: Duration) {
618        self.startup_timeout = val;
619    }
620
621    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
622        let mut registered = self.synchronizers.unwrap_or_default();
623        registered.insert(id, synchronizer);
624        self.synchronizers = Some(registered);
625        self
626    }
627
628    #[cfg(test)]
629    pub fn with_short_timeouts() -> Self {
630        Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
631    }
632
633    /// Cleanup function for shutting down remaining synchronizers when the nanny detects an error.
634    /// Sends close signals to all remaining synchronizers and waits for them to complete.
635    async fn cleanup_synchronizers(
636        mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
637        sync_close_senders: Vec<oneshot::Sender<()>>,
638    ) {
639        // Send close signals to all remaining synchronizers
640        for close_sender in sync_close_senders {
641            let _ = close_sender.send(());
642        }
643
644        // Await remaining tasks with timeout
645        let mut completed_tasks = 0;
646        while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
647            completed_tasks += 1;
648        }
649
650        // Warn if any synchronizers timed out during cleanup
651        let remaining_tasks = state_sync_tasks.len();
652        if remaining_tasks > 0 {
653            warn!(
654                completed = completed_tasks,
655                timed_out = remaining_tasks,
656                "Some synchronizers timed out during cleanup and may not have shut down cleanly"
657            );
658        }
659    }
660
661    /// Starts the synchronization of streams.
662    ///
663    /// Will error directly if the startup fails. Once the startup is complete, it will
664    /// communicate any fatal errors through the stream before closing it.
665    pub async fn run(
666        mut self,
667    ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
668    {
669        trace!("Starting BlockSynchronizer...");
670        let state_sync_tasks = FuturesUnordered::new();
671        let mut synchronizers = self
672            .synchronizers
673            .take()
674            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
675        // init synchronizers; unknown extractors are warned about and skipped rather than
676        // crashing the whole client, so a misconfigured protocol doesn't take down valid ones.
677        let init_results = join_all(synchronizers.iter_mut().map(|(id, s)| {
678            s.initialize()
679                .map(|res| (id.clone(), res))
680        }))
681        .await;
682        let mut to_skip = Vec::new();
683        for (extractor_id, result) in init_results {
684            match result {
685                Ok(()) => {}
686                Err(SynchronizerError::RPCError(crate::rpc::RPCError::UnknownExtractor(
687                    reason,
688                ))) => {
689                    warn!(%extractor_id, %reason, "Extractor not recognised by server, skipping");
690                    to_skip.push(extractor_id);
691                }
692                Err(e) => {
693                    return Err(BlockSynchronizerError::InitializationError {
694                        extractor: extractor_id,
695                        source: e,
696                    })
697                }
698            }
699        }
700        for id in &to_skip {
701            synchronizers.remove(id);
702        }
703        if synchronizers.is_empty() {
704            return Err(BlockSynchronizerError::NoSynchronizers);
705        }
706
707        let mut sync_streams = Vec::with_capacity(synchronizers.len());
708        let mut sync_close_senders = Vec::new();
709        for (extractor_id, synchronizer) in synchronizers.drain() {
710            let (handle, rx) = synchronizer.start().await;
711            let (join_handle, close_sender) = handle.split();
712            state_sync_tasks.push(join_handle);
713            sync_close_senders.push(close_sender);
714
715            sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
716        }
717
718        // startup, schedule first set of futures and wait for them to return to initialise
719        // synchronizers.
720        debug!("Waiting for initial synchronizer messages...");
721        let mut startup_futures = Vec::new();
722        for synchronizer in sync_streams.iter_mut() {
723            let fut = async {
724                let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
725                (synchronizer, res)
726            };
727            startup_futures.push(fut);
728        }
729
730        let mut ready_sync_msgs = HashMap::new();
731        let initial_headers = join_all(startup_futures)
732            .await
733            .into_iter()
734            .filter_map(|(synchronizer, res)| {
735                let extractor_id = synchronizer.extractor_id.clone();
736                match res {
737                    Ok(Some(Ok(msg))) => {
738                        debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
739                        // initially default all synchronizers to Ready
740                        synchronizer.mark_ready(&msg.header);
741                        let header = msg.header.clone();
742                        ready_sync_msgs.insert(extractor_id.name.clone(), msg);
743                        Some(header)
744                    }
745                    Ok(Some(Err(e))) => {
746                        synchronizer.mark_errored(e);
747                        None
748                    }
749                    Ok(None) => {
750                        // Synchronizer closed channel. This can only happen if the run
751                        // task ended, before this, the synchronizer should have sent
752                        // an error, so this case we likely don't have to handle that
753                        // explicitly
754                        warn!(%extractor_id, "Synchronizer closed during startup");
755                        synchronizer.mark_closed();
756                        None
757                    }
758                    Err(_) => {
759                        // We got an error because the synchronizer timed out during startup
760                        warn!(%extractor_id, "Timed out waiting for first message");
761                        synchronizer.mark_stale(&BlockHeader::default());
762                        None
763                    }
764                }
765            })
766            .collect::<HashSet<_>>() // remove duplicates
767            .into_iter()
768            .collect::<Vec<_>>();
769
770        // Fail fast if no synchronizer produced a ready first message.
771        Self::require_active_stream(&sync_streams)?;
772        let mut block_history = BlockHistory::new(initial_headers, 15)?;
773        // Determine the starting header for synchronization.
774        // Safe: require_active_stream above guarantees at least one Ready stream,
775        // so initial_headers is non-empty and block_history.latest() is Some.
776        let start_header = block_history
777            .latest()
778            .ok_or(BlockHistoryError::EmptyHistory)?;
779        info!(
780            start_block=%start_header,
781            n_healthy=ready_sync_msgs.len(),
782            n_total=sync_streams.len(),
783            "Block synchronization started successfully!"
784        );
785
786        // Determine correct state for each remaining synchronizer, based on their header vs the
787        // latest one
788        for stream in sync_streams.iter_mut() {
789            if let SynchronizerState::Ready(header) = stream.state.clone() {
790                if header.number < start_header.number {
791                    debug!(
792                        extractor_id=%stream.extractor_id,
793                        synchronizer_block=header.number,
794                        current_block=start_header.number,
795                        "Marking synchronizer as delayed during initialization"
796                    );
797                    stream.state = SynchronizerState::Delayed(header);
798                }
799            }
800        }
801
802        let (sync_tx, sync_rx) = mpsc::channel(30);
803        let main_loop_jh = tokio::spawn(async move {
804            let mut n_iter = 1;
805            loop {
806                // Send retrieved data to receivers.
807                let msg = FeedMessage::new(
808                    std::mem::take(&mut ready_sync_msgs),
809                    sync_streams
810                        .iter()
811                        .map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
812                        .collect(),
813                );
814                if sync_tx.send(Ok(msg)).await.is_err() {
815                    info!("Receiver closed, block synchronizer terminating..");
816                    return;
817                };
818
819                // Check if we have reached the max messages
820                if let Some(max_messages) = self.max_messages {
821                    if n_iter >= max_messages {
822                        info!(max_messages, "StreamEnd");
823                        return;
824                    }
825                }
826                n_iter += 1;
827
828                let res = self
829                    .handle_next_message(
830                        &mut sync_streams,
831                        &mut ready_sync_msgs,
832                        &mut block_history,
833                    )
834                    .await;
835
836                if let Err(e) = res {
837                    // Communicate error to clients, then end the loop
838                    let _ = sync_tx.send(Err(e)).await;
839                    return;
840                }
841            }
842        });
843
844        // We await the main loop and log any panics (should be impossible). If the
845        // main loop exits, all synchronizers should be ended or stale. So we kill any
846        // remaining stale ones just in case. A final error is propagated through the
847        // channel to the user.
848        let nanny_jh = tokio::spawn(async move {
849            // report any panics
850            let _ = main_loop_jh.await.map_err(|e| {
851                if e.is_panic() {
852                    error!("BlockSynchornizer main loop panicked: {e}")
853                }
854            });
855            debug!("Main loop exited. Closing synchronizers");
856            Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
857            debug!("Shutdown complete");
858        });
859        Ok((nanny_jh, sync_rx))
860    }
861
862    /// Retrieves next message from synchronizers
863    ///
864    /// The result is written into `ready_sync_messages`. Errors only if there is a
865    /// non-recoverable error or all synchronizers have ended.
866    async fn handle_next_message(
867        &self,
868        sync_streams: &mut [SynchronizerStream],
869        ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
870        block_history: &mut BlockHistory,
871    ) -> BlockSyncResult<()> {
872        let mut recv_futures = Vec::new();
873        for stream in sync_streams.iter_mut() {
874            // If stream is in ended state, do not check for any messages (it's receiver
875            // is closed), but do check stale streams.
876            if stream.has_ended() {
877                continue;
878            }
879            // Here we simply wait block_time + max_wait. This will not work for chains with
880            // unknown block times but is simple enough for now.
881            // If we would like to support unknown block times we could: Instruct all handles to
882            // await the max block time, if a header arrives within that time transition as
883            // usual, but via a select statement get notified (using e.g. Notify) if any other
884            // handle finishes before the timeout. Then await again but this time only for
885            // max_wait and then proceed as usual. So basically each try_advance task would have
886            // a select statement that allows it to exit the first timeout preemptively if any
887            // other try_advance task finished earlier.
888            recv_futures.push(async {
889                let res = stream
890                    .try_advance(
891                        block_history,
892                        self.block_time,
893                        self.latency_buffer,
894                        self.block_time
895                            .mul_f64(self.max_missed_blocks as f64),
896                    )
897                    .await?;
898                Ok::<_, BlockSynchronizerError>(
899                    res.map(|msg| (stream.extractor_id.name.clone(), msg)),
900                )
901            });
902        }
903        ready_sync_msgs.extend(
904            join_all(recv_futures)
905                .await
906                .into_iter()
907                .collect::<Result<Vec<_>, _>>()?
908                .into_iter()
909                .flatten(),
910        );
911
912        // Check if we have any active synchronizers (Ready, Delayed, or Advanced)
913        // If all synchronizers have been purged (Stale/Ended), exit the main loop
914        Self::check_streams(sync_streams)?;
915
916        // if we have any advanced header, we reinit the block history,
917        // else we simply advance the existing history
918        if sync_streams
919            .iter()
920            .any(SynchronizerStream::is_advanced)
921        {
922            *block_history = Self::reinit_block_history(sync_streams, block_history)?;
923        } else if let Some(header) = sync_streams
924            .iter()
925            .filter_map(SynchronizerStream::get_current_header)
926            .max_by_key(|b| b.number)
927        {
928            block_history.push(header.clone())?;
929        }
930        // If all synchronizers are stale (e.g. WS reconnect in progress), skip block
931        // history update and continue waiting for recovery.
932        Ok(())
933    }
934
935    /// Reinitialise block history and reclassifies active synchronizers states.
936    ///
937    /// We call this if we detect a future detached block. This usually only happens if
938    /// a synchronizer has a restart.
939    ///
940    /// ## Note
941    /// This method assumes that at least one synchronizer is in Advanced, Ready or
942    /// Delayed state, it will return an error in case this is not the case.
943    fn reinit_block_history(
944        sync_streams: &mut [SynchronizerStream],
945        block_history: &mut BlockHistory,
946    ) -> Result<BlockHistory, BlockSynchronizerError> {
947        let previous = block_history
948            .latest()
949            // Old block history should not be empty, startup should have populated it at this point
950            .ok_or(BlockHistoryError::EmptyHistory)?;
951        let blocks = sync_streams
952            .iter()
953            .filter_map(SynchronizerStream::get_current_header)
954            .cloned()
955            .collect();
956        let new_block_history = BlockHistory::new(blocks, 10)?;
957        let latest = new_block_history
958            .latest()
959            // Block history should not be empty, we just populated it.
960            .ok_or(BlockHistoryError::EmptyHistory)?;
961        info!(
962             %previous,
963            %latest,
964            "Advanced synchronizer detected. Reinitialized block history."
965        );
966        sync_streams
967            .iter_mut()
968            .for_each(|stream| {
969                // we only get headers from advanced, ready and delayed so stale
970                // or ended streams are not considered here
971                if let Some(header) = stream.get_current_header() {
972                    if header.number < latest.number {
973                        stream.state = SynchronizerState::Delayed(header.clone());
974                    } else if header.number == latest.number {
975                        stream.state = SynchronizerState::Ready(header.clone());
976                    }
977                }
978            });
979        Ok(new_block_history)
980    }
981
982    /// Startup check: fails if no synchronizer is active (all Stale or Ended at init time).
983    ///
984    /// Used once before entering the main loop, where all-Stale is a fatal configuration
985    /// problem (nothing to sync from), not a temporary disconnect.
986    fn require_active_stream(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
987        if sync_streams
988            .iter()
989            .any(|s| !s.has_ended() && !s.is_stale())
990        {
991            return Ok(());
992        }
993        let reason: Vec<String> = sync_streams
994            .iter()
995            .map(|s| format!("{} reported as {} at {}", s.extractor_id, s.state, s.modify_ts))
996            .collect();
997        Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
998    }
999
1000    /// Checks if the main loop should continue or exit.
1001    ///
1002    /// Returns `Ok` if:
1003    /// - At least one synchronizer is active (Ready, Delayed, or Advanced), OR
1004    /// - All synchronizers are stale but none have ended — temporary disconnect (e.g. WS reconnect)
1005    ///   where recovery is still possible.
1006    ///
1007    /// Returns `Err` if at least one synchronizer has permanently ended while all
1008    /// remaining ones are stale — no recovery path exists.
1009    fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
1010        let mut has_any_ended = false;
1011        let mut latest_ended_stream: Option<&SynchronizerStream> = None;
1012
1013        for stream in sync_streams.iter() {
1014            // If we have at least one active stream (ready, delayed, or advanced), continue.
1015            if !stream.has_ended() && !stream.is_stale() {
1016                return Ok(());
1017            }
1018
1019            if stream.has_ended() {
1020                has_any_ended = true;
1021                if latest_ended_stream.is_none() ||
1022                    stream.modify_ts >
1023                        latest_ended_stream
1024                            .as_ref()
1025                            .unwrap()
1026                            .modify_ts
1027                {
1028                    latest_ended_stream = Some(stream);
1029                }
1030            }
1031        }
1032
1033        // All streams are stale or ended. If none have ended, all synchronizers are
1034        // temporarily disconnected — wait for recovery without exiting the main loop.
1035        if !has_any_ended {
1036            return Ok(());
1037        }
1038
1039        // At least one synchronizer has permanently ended while all others are stale.
1040        let last_error_reason = if let Some(stream) = latest_ended_stream {
1041            if let Some(err) = &stream.error {
1042                format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
1043            } else {
1044                format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
1045            }
1046        } else {
1047            return Err(BlockSynchronizerError::NoSynchronizers);
1048        };
1049
1050        let mut reason = vec![last_error_reason];
1051
1052        sync_streams.iter().for_each(|stream| {
1053            reason.push(format!(
1054                "{} reported as {} at {}",
1055                stream.extractor_id, stream.state, stream.modify_ts
1056            ))
1057        });
1058
1059        Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
1060    }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use std::sync::Arc;
1066
1067    use async_trait::async_trait;
1068    use test_log::test;
1069    use tokio::sync::{oneshot, Mutex};
1070    use tycho_common::models::Chain;
1071
1072    use super::*;
1073    use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
1074
1075    #[derive(Clone, Debug)]
1076    enum MockBehavior {
1077        Normal,          // Exit successfully when receiving close signal
1078        IgnoreClose,     // Ignore close signals and hang (for timeout testing)
1079        ExitImmediately, // Exit immediately after first message (for quick failure testing)
1080    }
1081
1082    type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1083
1084    #[derive(Clone)]
1085    struct MockStateSync {
1086        header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1087        header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1088        close_received: Arc<Mutex<bool>>,
1089        behavior: MockBehavior,
1090        // For testing: store the close sender so tests can trigger close signals
1091        close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1092    }
1093
1094    impl MockStateSync {
1095        fn new() -> Self {
1096            Self::with_behavior(MockBehavior::Normal)
1097        }
1098
1099        fn with_behavior(behavior: MockBehavior) -> Self {
1100            let (tx, rx) = mpsc::channel(1);
1101            Self {
1102                header_tx: tx,
1103                header_rx: Arc::new(Mutex::new(Some(rx))),
1104                close_received: Arc::new(Mutex::new(false)),
1105                behavior,
1106                close_tx: Arc::new(Mutex::new(None)),
1107            }
1108        }
1109
1110        async fn was_close_received(&self) -> bool {
1111            *self.close_received.lock().await
1112        }
1113
1114        async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1115            self.header_tx
1116                .send(Ok(header))
1117                .await
1118                .map_err(|e| format!("sending header failed: {e}"))
1119        }
1120
1121        // For testing: trigger a close signal to make the synchronizer exit
1122        async fn trigger_close(&self) {
1123            if let Some(close_tx) = self.close_tx.lock().await.take() {
1124                let _ = close_tx.send(());
1125            }
1126        }
1127    }
1128
1129    #[async_trait]
1130    impl StateSynchronizer for MockStateSync {
1131        async fn initialize(&mut self) -> SyncResult<()> {
1132            Ok(())
1133        }
1134
1135        async fn start(
1136            mut self,
1137        ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1138            let block_rx = {
1139                let mut guard = self.header_rx.lock().await;
1140                guard
1141                    .take()
1142                    .expect("Block receiver was not set!")
1143            };
1144
1145            // Create close channel - we need to store one sender for testing and give one to the
1146            // handle
1147            let (close_tx_for_handle, close_rx) = oneshot::channel();
1148            let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1149
1150            // Store the test close sender
1151            {
1152                let mut guard = self.close_tx.lock().await;
1153                *guard = Some(close_tx_for_test);
1154            }
1155
1156            let behavior = self.behavior.clone();
1157            let close_received_clone = self.close_received.clone();
1158            let tx = self.header_tx.clone();
1159
1160            let jh = tokio::spawn(async move {
1161                match behavior {
1162                    MockBehavior::IgnoreClose => {
1163                        // Infinite loop to simulate a hung synchronizer that doesn't respond to
1164                        // close signals
1165                        loop {
1166                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1167                        }
1168                    }
1169                    MockBehavior::ExitImmediately => {
1170                        // Exit immediately with error to simulate immediate task failure
1171                        tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1172                            "Simulated immediate task failure".to_string(),
1173                        )))
1174                        .await
1175                        .unwrap();
1176                    }
1177                    MockBehavior::Normal => {
1178                        // Wait for close signal from either handle or test, then respond based on
1179                        // behavior
1180                        let _ = tokio::select! {
1181                            result = close_rx => result,
1182                            result = close_rx_for_test => result,
1183                        };
1184                        let mut guard = close_received_clone.lock().await;
1185                        *guard = true;
1186                    }
1187                }
1188            });
1189
1190            let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1191            (handle, block_rx)
1192        }
1193    }
1194
1195    fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1196        StateSyncMessage {
1197            header: BlockHeader {
1198                number: block as u64,
1199                hash: Bytes::from(vec![block]),
1200                parent_hash: Bytes::from(vec![block - 1]),
1201                revert: false,
1202                timestamp: 1000,
1203                partial_block_index: None,
1204            },
1205            ..Default::default()
1206        }
1207    }
1208
1209    /// Creates a partial block header message with an ephemeral hash encoding block number and
1210    /// partial index.
1211    fn partial_header_message(block: u8, partial_idx: u32) -> StateSyncMessage<BlockHeader> {
1212        // Ephemeral hash encodes block number and partial index for uniqueness
1213        let hash_bytes =
1214            [(block as u64).to_be_bytes().as_slice(), partial_idx.to_be_bytes().as_slice()]
1215                .concat();
1216        StateSyncMessage {
1217            header: BlockHeader {
1218                number: block as u64,
1219                hash: Bytes::from(hash_bytes),
1220                parent_hash: Bytes::from(vec![block - 1]),
1221                revert: false,
1222                timestamp: 1000,
1223                partial_block_index: Some(partial_idx),
1224            },
1225            ..Default::default()
1226        }
1227    }
1228
1229    fn revert_header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1230        StateSyncMessage {
1231            header: BlockHeader {
1232                number: block as u64,
1233                hash: Bytes::from(vec![block]),
1234                parent_hash: Bytes::from(vec![block - 1]),
1235                revert: true,
1236                timestamp: 1000,
1237                partial_block_index: None,
1238            },
1239            ..Default::default()
1240        }
1241    }
1242
1243    async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1244        timeout(Duration::from_millis(100), rx.recv())
1245            .await
1246            .expect("Responds in time")
1247            .expect("Should receive first message")
1248            .expect("No error")
1249    }
1250
1251    async fn setup_block_sync(
1252    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1253    {
1254        setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1255    }
1256
1257    // Starts up a synchronizer and consumes the first message on block 1.
1258    async fn setup_block_sync_with_behaviour(
1259        v2_behavior: MockBehavior,
1260        v3_behavior: MockBehavior,
1261    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1262    {
1263        let v2_sync = MockStateSync::with_behavior(v2_behavior);
1264        let v3_sync = MockStateSync::with_behavior(v3_behavior);
1265
1266        // Use reasonable timeouts to observe proper state transitions
1267        let mut block_sync = BlockSynchronizer::new(
1268            Duration::from_millis(20), // block_time
1269            Duration::from_millis(10), // max_wait
1270            3,                         // max_missed_blocks (stale threshold = 20ms * 3 = 60ms)
1271        );
1272        block_sync.max_messages(10); // Allow enough messages to see the progression
1273
1274        let block_sync = block_sync
1275            .register_synchronizer(
1276                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1277                v2_sync.clone(),
1278            )
1279            .register_synchronizer(
1280                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1281                v3_sync.clone(),
1282            );
1283
1284        // Send initial messages to both synchronizers
1285        let block1_msg = header_message(1);
1286        let _ = v2_sync
1287            .send_header(block1_msg.clone())
1288            .await;
1289        let _ = v3_sync
1290            .send_header(block1_msg.clone())
1291            .await;
1292
1293        // Start the block synchronizer
1294        let (nanny_handle, mut rx) = block_sync
1295            .run()
1296            .await
1297            .expect("BlockSynchronizer failed to start");
1298
1299        let first_feed_msg = receive_message(&mut rx).await;
1300        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1301        assert!(matches!(
1302            first_feed_msg
1303                .sync_states
1304                .get("uniswap-v2")
1305                .unwrap(),
1306            SynchronizerState::Ready(_)
1307        ));
1308        assert!(matches!(
1309            first_feed_msg
1310                .sync_states
1311                .get("uniswap-v3")
1312                .unwrap(),
1313            SynchronizerState::Ready(_)
1314        ));
1315
1316        (v2_sync, v3_sync, nanny_handle, rx)
1317    }
1318
1319    async fn shutdown_block_synchronizer(
1320        nanny_handle: JoinHandle<()>,
1321        rx: Receiver<BlockSyncResult<FeedMessage>>,
1322    ) {
1323        // Dropping the receiver causes the main loop's next send to fail, which exits
1324        // the loop. The nanny then calls cleanup_synchronizers, which sends close signals
1325        // to the synchronizer tasks via their handle-side channels.
1326        drop(rx);
1327        timeout(Duration::from_secs(2), nanny_handle)
1328            .await
1329            .expect("Nanny failed to exit within time")
1330            .expect("Nanny panicked");
1331    }
1332
1333    /// Send message to sync and assert it transitions to Ready at expected block/partial
1334    async fn send_and_assert_ready(
1335        sync: &MockStateSync,
1336        sync_name: &str,
1337        rx: &mut Receiver<BlockSyncResult<FeedMessage>>,
1338        msg: StateSyncMessage<BlockHeader>,
1339        expected_block: u64,
1340        expected_partial: Option<u32>,
1341    ) {
1342        sync.send_header(msg)
1343            .await
1344            .expect("send failed");
1345        let feed_msg = receive_message(rx).await;
1346        let state = feed_msg
1347            .sync_states
1348            .get(sync_name)
1349            .unwrap();
1350        match state {
1351            SynchronizerState::Ready(h) => {
1352                assert_eq!(h.number, expected_block, "wrong block number");
1353                assert_eq!(h.partial_block_index, expected_partial, "wrong partial index");
1354            }
1355            other => panic!("expected Ready, got {:?}", other),
1356        }
1357    }
1358
1359    #[test(tokio::test)]
1360    async fn test_two_ready_synchronizers() {
1361        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1362
1363        let second_msg = header_message(2);
1364        v2_sync
1365            .send_header(second_msg.clone())
1366            .await
1367            .expect("send_header failed");
1368        v3_sync
1369            .send_header(second_msg.clone())
1370            .await
1371            .expect("send_header failed");
1372        let second_feed_msg = receive_message(&mut rx).await;
1373
1374        let exp2 = FeedMessage {
1375            state_msgs: [
1376                ("uniswap-v2".to_string(), second_msg.clone()),
1377                ("uniswap-v3".to_string(), second_msg.clone()),
1378            ]
1379            .into_iter()
1380            .collect(),
1381            sync_states: [
1382                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1383                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1384            ]
1385            .into_iter()
1386            .collect(),
1387        };
1388        assert_eq!(second_feed_msg, exp2);
1389
1390        shutdown_block_synchronizer(nanny_handle, rx).await;
1391    }
1392
1393    #[test(tokio::test)]
1394    async fn test_delayed_synchronizer_catches_up() {
1395        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1396
1397        // Send block 2 to v2 synchronizer only
1398        let block2_msg = header_message(2);
1399        v2_sync
1400            .send_header(block2_msg.clone())
1401            .await
1402            .expect("send_header failed");
1403
1404        // Consume second message - v3 should be delayed
1405        let second_feed_msg = receive_message(&mut rx).await;
1406        debug!("Consumed second message for v2");
1407
1408        assert!(second_feed_msg
1409            .state_msgs
1410            .contains_key("uniswap-v2"));
1411        assert!(matches!(
1412            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1413            SynchronizerState::Ready(header) if header.number == 2
1414        ));
1415        assert!(!second_feed_msg
1416            .state_msgs
1417            .contains_key("uniswap-v3"));
1418        assert!(matches!(
1419            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1420            SynchronizerState::Delayed(header) if header.number == 1
1421        ));
1422
1423        // Now v3 catches up to block 2
1424        v3_sync
1425            .send_header(block2_msg.clone())
1426            .await
1427            .expect("send_header failed");
1428
1429        // Both advance to block 3
1430        let block3_msg = header_message(3);
1431        v2_sync
1432            .send_header(block3_msg.clone())
1433            .await
1434            .expect("send_header failed");
1435        v3_sync
1436            .send_header(block3_msg)
1437            .await
1438            .expect("send_header failed");
1439
1440        // Consume messages until we get both synchronizers on block 3
1441        // We may get an intermediate message for v3's catch-up or a combined message
1442        let mut third_feed_msg = receive_message(&mut rx).await;
1443
1444        // If this message doesn't have both univ2, it's an intermediate message, so we get the next
1445        // one
1446        if !third_feed_msg
1447            .state_msgs
1448            .contains_key("uniswap-v2")
1449        {
1450            third_feed_msg = rx
1451                .recv()
1452                .await
1453                .expect("header channel was closed")
1454                .expect("no error");
1455        }
1456        assert!(third_feed_msg
1457            .state_msgs
1458            .contains_key("uniswap-v2"));
1459        assert!(third_feed_msg
1460            .state_msgs
1461            .contains_key("uniswap-v3"));
1462        assert!(matches!(
1463            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1464            SynchronizerState::Ready(header) if header.number == 3
1465        ));
1466        assert!(matches!(
1467            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1468            SynchronizerState::Ready(header) if header.number == 3
1469        ));
1470
1471        shutdown_block_synchronizer(nanny_handle, rx).await;
1472    }
1473
1474    #[test(tokio::test)]
1475    async fn test_different_start_blocks() {
1476        let v2_sync = MockStateSync::new();
1477        let v3_sync = MockStateSync::new();
1478        let block_sync = BlockSynchronizer::with_short_timeouts()
1479            .register_synchronizer(
1480                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1481                v2_sync.clone(),
1482            )
1483            .register_synchronizer(
1484                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1485                v3_sync.clone(),
1486            );
1487
1488        // Initial messages - synchronizers at different blocks
1489        let block1_msg = header_message(1);
1490        let block2_msg = header_message(2);
1491
1492        let _ = v2_sync
1493            .send_header(block1_msg.clone())
1494            .await;
1495        v3_sync
1496            .send_header(block2_msg.clone())
1497            .await
1498            .expect("send_header failed");
1499
1500        // Start the block synchronizer - it should use block 2 as the starting block
1501        let (jh, mut rx) = block_sync
1502            .run()
1503            .await
1504            .expect("BlockSynchronizer failed to start.");
1505
1506        // Consume first message
1507        let first_feed_msg = receive_message(&mut rx).await;
1508        assert!(matches!(
1509            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1510            SynchronizerState::Delayed(header) if header.number == 1
1511        ));
1512        assert!(matches!(
1513            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1514            SynchronizerState::Ready(header) if header.number == 2
1515        ));
1516
1517        // Now v2 catches up to block 2
1518        v2_sync
1519            .send_header(block2_msg.clone())
1520            .await
1521            .expect("send_header failed");
1522
1523        // Both advance to block 3
1524        let block3_msg = header_message(3);
1525        let _ = v2_sync
1526            .send_header(block3_msg.clone())
1527            .await;
1528        v3_sync
1529            .send_header(block3_msg.clone())
1530            .await
1531            .expect("send_header failed");
1532
1533        // Consume third message - both should be on block 3
1534        let second_feed_msg = receive_message(&mut rx).await;
1535        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1536        assert!(matches!(
1537            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1538            SynchronizerState::Ready(header) if header.number == 3
1539        ));
1540        assert!(matches!(
1541            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1542            SynchronizerState::Ready(header) if header.number == 3
1543        ));
1544
1545        shutdown_block_synchronizer(jh, rx).await;
1546    }
1547
1548    #[test(tokio::test)]
1549    async fn test_synchronizer_fails_other_goes_stale() {
1550        let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1551            setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1552                .await;
1553
1554        let mut error_reported = false;
1555        for _ in 0..3 {
1556            if let Some(msg) = sync_rx.recv().await {
1557                match msg {
1558                    Err(_) => error_reported = true,
1559                    Ok(msg) => {
1560                        assert!(matches!(
1561                            msg.sync_states
1562                                .get("uniswap-v3")
1563                                .unwrap(),
1564                            SynchronizerState::Delayed(_)
1565                        ));
1566                        assert!(matches!(
1567                            msg.sync_states
1568                                .get("uniswap-v2")
1569                                .unwrap(),
1570                            SynchronizerState::Ended(_)
1571                        ));
1572                    }
1573                }
1574            }
1575        }
1576        assert!(error_reported, "BlockSynchronizer did not report final error");
1577
1578        // Wait for nanny to detect task failure and execute cleanup
1579        let result = timeout(Duration::from_secs(2), nanny_handle).await;
1580        assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1581
1582        // Verify that the remaining synchronizer received close signal during cleanup
1583        assert!(
1584            v3_sync.was_close_received().await,
1585            "v3_sync should have received close signal during cleanup"
1586        );
1587    }
1588
1589    #[test(tokio::test)]
1590    async fn test_cleanup_timeout_warning() {
1591        // Verify that cleanup_synchronizers emits a warning when synchronizers timeout during
1592        // cleanup
1593        let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1594            MockBehavior::ExitImmediately,
1595            MockBehavior::IgnoreClose,
1596        )
1597        .await;
1598
1599        // Wait for nanny to complete - cleanup should timeout on v3_sync but still complete
1600        let result = timeout(Duration::from_secs(10), nanny_handle).await;
1601        assert!(
1602            result.is_ok(),
1603            "Nanny should complete even when some synchronizers timeout during cleanup"
1604        );
1605
1606        // Note: In a real test environment, we would capture log output to verify the warning was
1607        // emitted. Since this is a unit test without log capture setup, we just verify that
1608        // cleanup completes even when some synchronizers timeout.
1609    }
1610
1611    #[test(tokio::test)]
1612    async fn test_one_synchronizer_goes_stale_while_other_works() {
1613        // Test Case 1: One protocol goes stale and is removed while another protocol works normally
1614        let (_v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1615
1616        // Send block 2 only to v3, v2 will timeout and become delayed
1617        let block2_msg = header_message(2);
1618        let _ = v3_sync
1619            .send_header(block2_msg.clone())
1620            .await;
1621        // Don't send to v2_sync - it will timeout
1622
1623        // Consume second message - v2 should be delayed, v3 ready
1624        let second_feed_msg = receive_message(&mut rx).await;
1625        assert!(second_feed_msg
1626            .state_msgs
1627            .contains_key("uniswap-v3"));
1628        assert!(!second_feed_msg
1629            .state_msgs
1630            .contains_key("uniswap-v2"));
1631        assert!(matches!(
1632            second_feed_msg
1633                .sync_states
1634                .get("uniswap-v3")
1635                .unwrap(),
1636            SynchronizerState::Ready(_)
1637        ));
1638        // v2 should be delayed (if still present) - check nanny is still running
1639        if let Some(v2_state) = second_feed_msg
1640            .sync_states
1641            .get("uniswap-v2")
1642        {
1643            if matches!(v2_state, SynchronizerState::Delayed(_)) {
1644                // Verify nanny is still running when synchronizer is just delayed
1645                assert!(
1646                    !nanny_handle.is_finished(),
1647                    "Nanny should still be running when synchronizer is delayed (not stale yet)"
1648                );
1649            }
1650        }
1651
1652        // Wait a bit, then continue sending blocks to v3 but not v2
1653        tokio::time::sleep(Duration::from_millis(15)).await;
1654
1655        // Continue sending blocks only to v3 to keep it healthy while v2 goes stale
1656        let block3_msg = header_message(3);
1657        let _ = v3_sync
1658            .send_header(block3_msg.clone())
1659            .await;
1660
1661        tokio::time::sleep(Duration::from_millis(40)).await;
1662
1663        let mut stale_found = false;
1664        for _ in 0..2 {
1665            if let Some(Ok(msg)) = rx.recv().await {
1666                if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1667                    stale_found = true;
1668                }
1669            }
1670        }
1671        assert!(stale_found, "v2 synchronizer should be stale");
1672
1673        shutdown_block_synchronizer(nanny_handle, rx).await;
1674    }
1675
1676    #[test(tokio::test)]
1677    async fn test_all_synchronizers_stale_loop_continues() {
1678        // When all synchronizers go stale simultaneously (simulating a WS reconnect), the
1679        // main loop must NOT exit — it should keep running and wait for recovery.
1680        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1681
1682        // Stop sending messages — both should timeout, go Delayed, then Stale.
1683        let mut seen_delayed = false;
1684        let mut seen_stale = false;
1685        let start_time = tokio::time::Instant::now();
1686
1687        while let Ok(Some(Ok(msg))) =
1688            tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1689        {
1690            let v2_state = msg.sync_states.get("uniswap-v2");
1691            let v3_state = msg.sync_states.get("uniswap-v3");
1692
1693            if !seen_delayed &&
1694                (matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1695                    matches!(v3_state, Some(SynchronizerState::Delayed(_))))
1696            {
1697                seen_delayed = true;
1698                assert!(
1699                    !nanny_handle.is_finished(),
1700                    "Nanny must still run when synchronizers are Delayed"
1701                );
1702            }
1703
1704            if matches!(v2_state, Some(SynchronizerState::Stale(_))) &&
1705                matches!(v3_state, Some(SynchronizerState::Stale(_)))
1706            {
1707                seen_stale = true;
1708                assert!(
1709                    !nanny_handle.is_finished(),
1710                    "Main loop must not exit when all synchronizers are Stale (awaiting recovery)"
1711                );
1712                break;
1713            }
1714
1715            if start_time.elapsed() > Duration::from_millis(500) {
1716                break;
1717            }
1718        }
1719
1720        assert!(seen_delayed, "Synchronizers should transition through Delayed first");
1721        assert!(seen_stale, "Both synchronizers should reach Stale state");
1722
1723        // Simulate the underlying synchronizer tasks permanently dying:
1724        // trigger_close makes the mock tasks exit (dropping their tx clones), and
1725        // dropping the MockStateSyncs closes the original sender side.
1726        // This causes SynchronizerStream.rx to return None → Ended → check_streams error.
1727        v2_sync.trigger_close().await;
1728        v3_sync.trigger_close().await;
1729        drop(v2_sync);
1730        drop(v3_sync);
1731
1732        // Now the main loop should detect all-ended and report an error.
1733        let mut error_reported = false;
1734        while let Some(msg) = rx.recv().await {
1735            if msg.is_err() {
1736                error_reported = true;
1737            }
1738        }
1739        assert!(error_reported, "Expected an error after all synchronizers ended");
1740
1741        let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1742        assert!(nanny_result.is_ok(), "Nanny should complete after all synchronizers ended");
1743    }
1744
1745    #[test(tokio::test)]
1746    async fn test_all_synchronizers_recover_after_going_stale() {
1747        // Simulates a full WS reconnect: both synchronizers go stale, then reconnect
1748        // and send an advanced block. The main loop should rebase block history and resume.
1749        let v2_sync = MockStateSync::new();
1750        let v3_sync = MockStateSync::new();
1751        let block_sync =
1752            BlockSynchronizer::new(Duration::from_millis(20), Duration::from_millis(10), 3)
1753                .register_synchronizer(
1754                    ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1755                    v2_sync.clone(),
1756                )
1757                .register_synchronizer(
1758                    ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1759                    v3_sync.clone(),
1760                );
1761
1762        v2_sync
1763            .send_header(header_message(1))
1764            .await
1765            .unwrap();
1766        v3_sync
1767            .send_header(header_message(1))
1768            .await
1769            .unwrap();
1770
1771        let (nanny_handle, mut rx) = block_sync
1772            .run()
1773            .await
1774            .expect("BlockSynchronizer start failed");
1775
1776        let first_msg = receive_message(&mut rx).await;
1777        assert!(matches!(
1778            first_msg.sync_states.get("uniswap-v2").unwrap(),
1779            SynchronizerState::Ready(h) if h.number == 1
1780        ));
1781        assert!(matches!(
1782            first_msg.sync_states.get("uniswap-v3").unwrap(),
1783            SynchronizerState::Ready(h) if h.number == 1
1784        ));
1785
1786        // Stop sending messages — both should go Stale.
1787        let mut seen_stale = false;
1788        let start_time = tokio::time::Instant::now();
1789        while let Ok(Some(Ok(msg))) =
1790            tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1791        {
1792            let v2 = msg
1793                .sync_states
1794                .get("uniswap-v2")
1795                .unwrap();
1796            let v3 = msg
1797                .sync_states
1798                .get("uniswap-v3")
1799                .unwrap();
1800            if matches!(v2, SynchronizerState::Stale(_)) &&
1801                matches!(v3, SynchronizerState::Stale(_))
1802            {
1803                seen_stale = true;
1804                assert!(
1805                    !nanny_handle.is_finished(),
1806                    "Main loop must not exit while synchronizers are Stale"
1807                );
1808                break;
1809            }
1810            if start_time.elapsed() > Duration::from_millis(500) {
1811                break;
1812            }
1813        }
1814        assert!(seen_stale, "Both synchronizers should go Stale");
1815
1816        // Simulate reconnect: send block 5 (advanced — not connected to block 1).
1817        v2_sync
1818            .send_header(header_message(5))
1819            .await
1820            .unwrap();
1821        v3_sync
1822            .send_header(header_message(5))
1823            .await
1824            .unwrap();
1825
1826        // The main loop should detect the advanced blocks, rebase block history, and
1827        // reclassify both synchronizers as Ready at block 5.
1828        let mut recovered = false;
1829        for _ in 0..20 {
1830            let msg = receive_message(&mut rx).await;
1831            let v2 = msg
1832                .sync_states
1833                .get("uniswap-v2")
1834                .unwrap();
1835            let v3 = msg
1836                .sync_states
1837                .get("uniswap-v3")
1838                .unwrap();
1839            if matches!(v2, SynchronizerState::Ready(h) if h.number == 5) &&
1840                matches!(v3, SynchronizerState::Ready(h) if h.number == 5)
1841            {
1842                recovered = true;
1843                break;
1844            }
1845        }
1846        assert!(recovered, "Both synchronizers should recover to Ready at block 5");
1847
1848        // Drop rx to signal the main loop to stop, then await nanny cleanup.
1849        drop(rx);
1850        timeout(Duration::from_secs(2), nanny_handle)
1851            .await
1852            .expect("Nanny timed out")
1853            .expect("Nanny panicked");
1854    }
1855
1856    #[test(tokio::test)]
1857    async fn test_stale_synchronizer_recovers() {
1858        // Test Case 2: All protocols go stale and main loop exits gracefully
1859        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1860
1861        // Send second messages to v2 only, shortly before both would go stale
1862        tokio::time::sleep(Duration::from_millis(50)).await;
1863        let block2_msg = header_message(2);
1864        let _ = v2_sync
1865            .send_header(block2_msg.clone())
1866            .await;
1867
1868        // we should get two messages here
1869        for _ in 0..2 {
1870            if let Some(msg) = rx.recv().await {
1871                if let Ok(msg) = msg {
1872                    if matches!(
1873                        msg.sync_states
1874                            .get("uniswap-v2")
1875                            .unwrap(),
1876                        SynchronizerState::Ready(_)
1877                    ) {
1878                        assert!(matches!(
1879                            msg.sync_states
1880                                .get("uniswap-v3")
1881                                .unwrap(),
1882                            SynchronizerState::Delayed(_)
1883                        ));
1884                        break;
1885                    };
1886                }
1887            } else {
1888                panic!("Channel closed unexpectedly")
1889            }
1890        }
1891
1892        // Now v3 should be stale
1893        tokio::time::sleep(Duration::from_millis(15)).await;
1894        let block3_msg = header_message(3);
1895        let _ = v2_sync
1896            .send_header(block3_msg.clone())
1897            .await;
1898        let third_msg = receive_message(&mut rx).await;
1899        dbg!(&third_msg);
1900        assert!(matches!(
1901            third_msg
1902                .sync_states
1903                .get("uniswap-v2")
1904                .unwrap(),
1905            SynchronizerState::Ready(_)
1906        ));
1907        assert!(matches!(
1908            third_msg
1909                .sync_states
1910                .get("uniswap-v3")
1911                .unwrap(),
1912            SynchronizerState::Stale(_)
1913        ));
1914
1915        let block4_msg = header_message(4);
1916        let _ = v3_sync
1917            .send_header(block2_msg.clone())
1918            .await;
1919        let _ = v3_sync
1920            .send_header(block3_msg.clone())
1921            .await;
1922        let _ = v3_sync
1923            .send_header(block4_msg.clone())
1924            .await;
1925        let _ = v2_sync
1926            .send_header(block4_msg.clone())
1927            .await;
1928        let fourth_msg = receive_message(&mut rx).await;
1929        assert!(matches!(
1930            fourth_msg
1931                .sync_states
1932                .get("uniswap-v2")
1933                .unwrap(),
1934            SynchronizerState::Ready(_)
1935        ));
1936        assert!(matches!(
1937            fourth_msg
1938                .sync_states
1939                .get("uniswap-v3")
1940                .unwrap(),
1941            SynchronizerState::Ready(_)
1942        ));
1943
1944        shutdown_block_synchronizer(nanny_handle, rx).await;
1945
1946        // Verify cleanup was triggered for both synchronizers
1947        assert!(
1948            v2_sync.was_close_received().await,
1949            "v2_sync should have received close signal during cleanup"
1950        );
1951        assert!(
1952            v3_sync.was_close_received().await,
1953            "v3_sync should have received close signal during cleanup"
1954        );
1955    }
1956
1957    #[test(tokio::test)]
1958    async fn test_all_synchronizer_advanced() {
1959        // Test the case were all synchronizers successfully recover but stream
1960        // from a disconnected future block.
1961
1962        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1963
1964        let block3 = header_message(3);
1965        v2_sync
1966            .send_header(block3.clone())
1967            .await
1968            .unwrap();
1969        v3_sync
1970            .send_header(block3)
1971            .await
1972            .unwrap();
1973
1974        let msg = receive_message(&mut rx).await;
1975        matches!(
1976            msg.sync_states
1977                .get("uniswap-v2")
1978                .unwrap(),
1979            SynchronizerState::Ready(_)
1980        );
1981        matches!(
1982            msg.sync_states
1983                .get("uniswap-v3")
1984                .unwrap(),
1985            SynchronizerState::Ready(_)
1986        );
1987
1988        shutdown_block_synchronizer(nanny_handle, rx).await;
1989    }
1990
1991    #[test(tokio::test)]
1992    async fn test_one_synchronizer_advanced() {
1993        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1994
1995        let block2 = header_message(2);
1996        let block4 = header_message(4);
1997        v2_sync
1998            .send_header(block4.clone())
1999            .await
2000            .unwrap();
2001        v3_sync
2002            .send_header(block2.clone())
2003            .await
2004            .unwrap();
2005
2006        let msg = receive_message(&mut rx).await;
2007        matches!(
2008            msg.sync_states
2009                .get("uniswap-v2")
2010                .unwrap(),
2011            SynchronizerState::Ready(_)
2012        );
2013        matches!(
2014            msg.sync_states
2015                .get("uniswap-v3")
2016                .unwrap(),
2017            SynchronizerState::Delayed(_)
2018        );
2019
2020        shutdown_block_synchronizer(nanny_handle, rx).await;
2021    }
2022
2023    #[test(tokio::test)]
2024    async fn test_partial_blocks_normal_operation() {
2025        // Normal operation: partials with arbitrary index increments, then advance to next block
2026        // Scenario: block 1 → partials 0,3,7 for block 2 → partials 0,2 for block 3
2027        let (v2_sync, _v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
2028
2029        // Partials for block 2 with arbitrary increments (only v2, v3 ignored)
2030        send_and_assert_ready(
2031            &v2_sync,
2032            "uniswap-v2",
2033            &mut rx,
2034            partial_header_message(2, 0),
2035            2,
2036            Some(0),
2037        )
2038        .await;
2039        send_and_assert_ready(
2040            &v2_sync,
2041            "uniswap-v2",
2042            &mut rx,
2043            partial_header_message(2, 3),
2044            2,
2045            Some(3),
2046        )
2047        .await;
2048        send_and_assert_ready(
2049            &v2_sync,
2050            "uniswap-v2",
2051            &mut rx,
2052            partial_header_message(2, 7),
2053            2,
2054            Some(7),
2055        )
2056        .await;
2057
2058        // Advance to block 3 with partials
2059        send_and_assert_ready(
2060            &v2_sync,
2061            "uniswap-v2",
2062            &mut rx,
2063            partial_header_message(3, 0),
2064            3,
2065            Some(0),
2066        )
2067        .await;
2068        send_and_assert_ready(
2069            &v2_sync,
2070            "uniswap-v2",
2071            &mut rx,
2072            partial_header_message(3, 2),
2073            3,
2074            Some(2),
2075        )
2076        .await;
2077
2078        shutdown_block_synchronizer(nanny_handle, rx).await;
2079    }
2080
2081    #[test(tokio::test)]
2082    async fn test_partial_blocks_handles_reverts() {
2083        // Revert resets state and allows continuation on new fork with full blocks
2084        // Scenario: block 1 → block 2 → partials for block 3 → revert to 2 → full block 3
2085        let (v2_sync, _v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
2086
2087        // Advance to full block 2 (only v2, v3 ignored)
2088        send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(2), 2, None).await;
2089
2090        // Partials for block 3
2091        send_and_assert_ready(
2092            &v2_sync,
2093            "uniswap-v2",
2094            &mut rx,
2095            partial_header_message(3, 0),
2096            3,
2097            Some(0),
2098        )
2099        .await;
2100        send_and_assert_ready(
2101            &v2_sync,
2102            "uniswap-v2",
2103            &mut rx,
2104            partial_header_message(3, 2),
2105            3,
2106            Some(2),
2107        )
2108        .await;
2109
2110        // Revert to block 2
2111        send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, revert_header_message(2), 2, None)
2112            .await;
2113
2114        // New fork: full block 3
2115        send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(3), 3, None).await;
2116
2117        shutdown_block_synchronizer(nanny_handle, rx).await;
2118    }
2119
2120    #[test(tokio::test)]
2121    async fn test_partial_blocks_delayed_synchronizer_catches_up() {
2122        // Delayed synchronizer catches up when receiving partial blocks
2123        // Scenario: v2 receives partials while v3 is delayed, then v3 catches up
2124        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
2125
2126        // v2 receives partial 0 for block 2; v3 times out and becomes Delayed
2127        let partial_0 = partial_header_message(2, 0);
2128        v2_sync
2129            .send_header(partial_0.clone())
2130            .await
2131            .expect("send partial 0 failed");
2132
2133        let msg = receive_message(&mut rx).await;
2134        // v2 is Ready with partial 0, v3 is Delayed
2135        assert!(msg
2136            .state_msgs
2137            .contains_key("uniswap-v2"));
2138        assert!(!msg
2139            .state_msgs
2140            .contains_key("uniswap-v3"));
2141        assert!(matches!(
2142            msg.sync_states.get("uniswap-v2").unwrap(),
2143            SynchronizerState::Ready(h) if h.partial_block_index == Some(0)
2144        ));
2145        assert!(matches!(
2146            msg.sync_states
2147                .get("uniswap-v3")
2148                .unwrap(),
2149            SynchronizerState::Delayed(_)
2150        ));
2151
2152        // v2 advances to partial 2; v3 catches up by sending partial 0 then partial 2
2153        let partial_2 = partial_header_message(2, 2);
2154        v2_sync
2155            .send_header(partial_2.clone())
2156            .await
2157            .expect("send partial 2 failed");
2158        v3_sync
2159            .send_header(partial_0.clone())
2160            .await
2161            .expect("v3 catch up partial 0 failed");
2162        v3_sync
2163            .send_header(partial_2.clone())
2164            .await
2165            .expect("v3 catch up partial 2 failed");
2166
2167        // v3 catches up within a few message cycles
2168        let mut v3_ready = false;
2169        for _ in 0..3 {
2170            let msg = receive_message(&mut rx).await;
2171            if matches!(
2172                msg.sync_states.get("uniswap-v3").unwrap(),
2173                SynchronizerState::Ready(h) if h.partial_block_index == Some(2)
2174            ) {
2175                v3_ready = true;
2176                break;
2177            }
2178        }
2179        assert!(v3_ready, "v3 caught up to partial 2");
2180
2181        shutdown_block_synchronizer(nanny_handle, rx).await;
2182    }
2183}