Skip to main content

tycho_client/feed/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::{Display, Formatter},
4    time::Duration,
5};
6
7use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
8use futures03::{
9    future::{join_all, try_join_all},
10    stream::FuturesUnordered,
11    StreamExt,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::{
16    sync::{
17        mpsc::{self, Receiver},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25    display::opt,
26    dto::{BlockChanges, ExtractorIdentity},
27    Bytes,
28};
29
30use crate::feed::{
31    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
32    synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
33};
34
35mod block_history;
36pub mod component_tracker;
37pub mod synchronizer;
38
39/// A trait representing a minimal interface for types that behave like a block header.
40///
41/// This abstraction allows working with either full block headers (`BlockHeader`)
42/// or simplified structures that only provide a timestamp (e.g., for RFQ logic).
43pub trait HeaderLike {
44    fn block(self) -> Option<BlockHeader>;
45    fn block_number_or_timestamp(self) -> u64;
46}
47
48#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
49pub struct BlockHeader {
50    pub hash: Bytes,
51    pub number: u64,
52    pub parent_hash: Bytes,
53    pub revert: bool,
54    pub timestamp: u64,
55    /// Index of a partial block update within a block. None for full blocks.
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub partial_block_index: Option<u32>,
58}
59
60impl BlockHeader {
61    fn is_partial(&self) -> bool {
62        self.partial_block_index.is_some()
63    }
64}
65
66impl Display for BlockHeader {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        // Take first 6 hex chars of the hash for readability
69        let short_hash = if self.hash.len() >= 4 {
70            hex::encode(&self.hash[..4]) // 4 bytes → 8 hex chars
71        } else {
72            hex::encode(&self.hash)
73        };
74
75        match self.partial_block_index {
76            Some(idx) => write!(f, "Block #{} [0x{}..] (partial {})", self.number, short_hash, idx),
77            None => write!(f, "Block #{} [0x{}..]", self.number, short_hash),
78        }
79    }
80}
81
82impl From<&BlockChanges> for BlockHeader {
83    fn from(block_changes: &BlockChanges) -> Self {
84        let block = &block_changes.block;
85        Self {
86            hash: block.hash.clone(),
87            number: block.number,
88            parent_hash: block.parent_hash.clone(),
89            revert: block_changes.revert,
90            timestamp: block.ts.and_utc().timestamp() as u64,
91            partial_block_index: block_changes.partial_block_index,
92        }
93    }
94}
95
96impl HeaderLike for BlockHeader {
97    fn block(self) -> Option<BlockHeader> {
98        Some(self)
99    }
100
101    fn block_number_or_timestamp(self) -> u64 {
102        self.number
103    }
104}
105
106#[derive(Error, Debug)]
107pub enum BlockSynchronizerError {
108    #[error("Failed to initialize synchronizer: {0}")]
109    InitializationError(#[from] SynchronizerError),
110
111    #[error("Failed to process new block: {0}")]
112    BlockHistoryError(#[from] BlockHistoryError),
113
114    #[error("Not a single synchronizer was ready: {0}")]
115    NoReadySynchronizers(String),
116
117    #[error("No synchronizers were set")]
118    NoSynchronizers,
119
120    #[error("Failed to convert duration: {0}")]
121    DurationConversionError(String),
122}
123
124type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
125
126/// Aligns multiple StateSynchronizers on the block dimension.
127///
128/// ## Purpose
129/// The purpose of this component is to handle streams from multiple state synchronizers and
130/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
131/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
132/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
133/// included again once we reach the block it is at.
134///
135/// ## Limitations
136/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
137///
138/// ## Initialisation
139/// Queries all registered synchronizers for their first message and evaluates the state of each
140/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
141// TODO: what is the startup timeout
142/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
143/// closed.
144///
145/// ## Main loop
146/// Once started, the synchronizers are queried concurrently for messages in lock step:
147/// the main loop queries all synchronizers in ready for the last emitted data, builds the
148/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
149///
150/// ## Synchronization Logic
151///
152/// To classify a synchronizer as delayed, we need to first define the current block. The highest
153/// block number of all ready synchronizers is considered the current block.
154///
155/// Once we have the current block we can easily determine which block we expect next. And if a
156/// synchronizer delivers an older block we can classify it as delayed.
157///
158/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
159/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
160/// the height of an advanced synchronizer (and flagging it as such in the meantime).
161///
162/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
163/// within the block production step of the blockchain:
164/// The wait procedure consists of waiting for any of the individual ProtocolStateSynchronizers
165/// to emit a new message (within a max timeout - several multiples of the block time). Once a
166/// message is received a very short timeout starts for the remaining synchronizers, to deliver a
167/// message. Any synchronizer failing to do so is transitioned to delayed.
168///
169/// ### Note
170/// The described process above is the goal. It is currently not implemented like that. Instead we
171/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
172/// timeout. This is simpler but only works well on chains with fixed block times.
173pub struct BlockSynchronizer<S> {
174    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
175    /// Time to wait for a block usually
176    block_time: std::time::Duration,
177    /// Added on top of block time to account for latency
178    latency_buffer: std::time::Duration,
179    /// Time to wait for the full first message, including snapshot retrieval
180    startup_timeout: std::time::Duration,
181    /// Optionally, end the stream after emitting max messages
182    max_messages: Option<usize>,
183    /// Amount of blocks a protocol can be delayed for, before it is considered stale
184    max_missed_blocks: u64,
185}
186
187#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
188#[serde(tag = "status", rename_all = "lowercase")]
189pub enum SynchronizerState {
190    /// Initial state, assigned before trying to receive any message
191    Started,
192    /// The synchronizer emitted a message for the block as expected
193    Ready(BlockHeader),
194    /// The synchronizer is on a previous block compared to others and is expected to
195    /// catch up soon.
196    Delayed(BlockHeader),
197    /// The synchronizer hasn't emitted messages for > `max_missed_blocks` or has
198    /// fallen far behind. At this point we do not wait for it anymore but it can
199    /// still eventually recover.
200    Stale(BlockHeader),
201    /// The synchronizer is on future not connected block.
202    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
203    // StateSynchronizer. This can only happen if we are processing too slow and one or all of the
204    // synchronizers restarts e.g. due to websocket connection drops.
205    Advanced(BlockHeader),
206    /// The synchronizer ended with an error.
207    Ended(String),
208}
209
210impl Display for SynchronizerState {
211    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212        match self {
213            SynchronizerState::Started => write!(f, "Started"),
214            SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
215            SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
216            SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
217            SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
218            SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
219        }
220    }
221}
222
223pub struct SynchronizerStream {
224    extractor_id: ExtractorIdentity,
225    state: SynchronizerState,
226    error: Option<SynchronizerError>,
227    modify_ts: NaiveDateTime,
228    rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
229}
230
231impl SynchronizerStream {
232    fn new(
233        extractor_id: &ExtractorIdentity,
234        rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
235    ) -> Self {
236        Self {
237            extractor_id: extractor_id.clone(),
238            state: SynchronizerState::Started,
239            error: None,
240            modify_ts: Local::now().naive_utc(),
241            rx,
242        }
243    }
244    async fn try_advance(
245        &mut self,
246        block_history: &BlockHistory,
247        block_time: std::time::Duration,
248        latency_buffer: std::time::Duration,
249        stale_threshold: std::time::Duration,
250    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
251        let extractor_id = self.extractor_id.clone();
252        let latest_block = block_history.latest();
253
254        match &self.state {
255            SynchronizerState::Started | SynchronizerState::Ended(_) => {
256                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
257                Ok(None)
258            }
259            SynchronizerState::Advanced(b) => {
260                let future_block = b.clone();
261                // Transition to ready once we arrived at the expected height
262                self.transition(future_block, block_history, stale_threshold)?;
263                Ok(None)
264            }
265            SynchronizerState::Ready(previous_block) => {
266                // Try to recv the next expected block, update state accordingly.
267                self.try_recv_next_expected(
268                    block_time + latency_buffer,
269                    block_history,
270                    previous_block.clone(),
271                    stale_threshold,
272                )
273                .await
274            }
275            SynchronizerState::Delayed(old_block) => {
276                // try to catch up all currently queued blocks until the expected block
277                debug!(
278                    %old_block,
279                    latest_block=opt(&latest_block),
280                    %extractor_id,
281                    "Trying to catch up to latest block"
282                );
283                self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
284                    .await
285            }
286            SynchronizerState::Stale(old_block) => {
287                // try to catch up all currently queued blocks until the expected block
288                debug!(
289                    %old_block,
290                    latest_block=opt(&latest_block),
291                    %extractor_id,
292                    "Trying to catch up stale synchronizer to latest block"
293                );
294                self.try_catch_up(block_history, block_time, stale_threshold)
295                    .await
296            }
297        }
298    }
299
300    /// Standard way to advance a well-behaved state synchronizer.
301    ///
302    /// Will wait for a new block on the synchronizer within a timeout. And modify its
303    /// state based on the outcome.
304    ///
305    /// ## Note
306    /// This method assumes that the current state is `Ready`.
307    async fn try_recv_next_expected(
308        &mut self,
309        max_wait: std::time::Duration,
310        block_history: &BlockHistory,
311        previous_block: BlockHeader,
312        stale_threshold: std::time::Duration,
313    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
314        let extractor_id = self.extractor_id.clone();
315        match timeout(max_wait, self.rx.recv()).await {
316            Ok(Some(Ok(msg))) => {
317                self.transition(msg.header.clone(), block_history, stale_threshold)?;
318                Ok(Some(msg))
319            }
320            Ok(Some(Err(e))) => {
321                // The underlying synchronizer exhausted its retries
322                self.mark_errored(e);
323                Ok(None)
324            }
325            Ok(None) => {
326                // This case should not happen, as we shouldn't poll the synchronizer after we
327                // closed it or after it errored.
328                warn!(
329                    %extractor_id,
330                    "Tried to poll from closed synchronizer.",
331                );
332                self.mark_closed();
333                Ok(None)
334            }
335            Err(_) => {
336                // trying to advance a block timed out
337                debug!(%extractor_id, %previous_block, "No block received within time limit.");
338                // No need to consider state since we only call this method if we are
339                // in the Ready state.
340                self.state = SynchronizerState::Delayed(previous_block.clone());
341                self.modify_ts = Local::now().naive_utc();
342                Ok(None)
343            }
344        }
345    }
346
347    /// Tries to catch up a delayed state synchronizer.
348    ///
349    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
350    /// by consuming all waiting messages in its queue and waiting for any new block messages
351    /// within a timeout. Finally, all update messages are merged into one and returned.
352    async fn try_catch_up(
353        &mut self,
354        block_history: &BlockHistory,
355        max_wait: std::time::Duration,
356        stale_threshold: std::time::Duration,
357    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
358        let mut results = Vec::new();
359        let extractor_id = self.extractor_id.clone();
360
361        // Set a deadline for the overall catch-up operation
362        let deadline = std::time::Instant::now() + max_wait;
363
364        while std::time::Instant::now() < deadline {
365            match timeout(
366                deadline.saturating_duration_since(std::time::Instant::now()),
367                self.rx.recv(),
368            )
369            .await
370            {
371                Ok(Some(Ok(msg))) => {
372                    debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
373                    let block_pos = block_history.determine_block_position(&msg.header)?;
374                    results.push(msg);
375                    if matches!(block_pos, BlockPosition::NextExpected | BlockPosition::NextPartial)
376                    {
377                        break;
378                    }
379                }
380                Ok(Some(Err(e))) => {
381                    // Synchronizer errored during catch up
382                    self.mark_errored(e);
383                    return Ok(None);
384                }
385                Ok(None) => {
386                    // This case should not happen, as we shouldn't poll the synchronizer after we
387                    // closed it or after it errored.
388                    warn!(
389                        %extractor_id,
390                        "Tried to poll from closed synchronizer during catch up.",
391                    );
392                    self.mark_closed();
393                    return Ok(None);
394                }
395                Err(_) => {
396                    debug!(%extractor_id, "Timed out waiting for catch-up");
397                    break;
398                }
399            }
400        }
401
402        let merged = results
403            .into_iter()
404            .reduce(|l, r| l.merge(r));
405
406        if let Some(msg) = merged {
407            // we were able to get at least one block out
408            debug!(%extractor_id, "Delayed extractor made progress!");
409            self.transition(msg.header.clone(), block_history, stale_threshold)?;
410            Ok(Some(msg))
411        } else {
412            // No progress made during catch-up, check if we should go stale
413            self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
414            Ok(None)
415        }
416    }
417
418    /// Helper method to check if synchronizer should transition to stale based on time elapsed
419    fn check_and_transition_to_stale_if_needed(
420        &mut self,
421        stale_threshold: std::time::Duration,
422        fallback_header: Option<BlockHeader>,
423    ) -> Result<bool, BlockSynchronizerError> {
424        let now = Local::now().naive_utc();
425        let wait_duration = now.signed_duration_since(self.modify_ts);
426        let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
427            .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
428
429        if wait_duration > stale_threshold_chrono {
430            let header_to_use = match (&self.state, fallback_header) {
431                (SynchronizerState::Ready(h), _) |
432                (SynchronizerState::Delayed(h), _) |
433                (SynchronizerState::Stale(h), _) => h.clone(),
434                (_, Some(h)) => h,
435                _ => BlockHeader::default(),
436            };
437
438            warn!(
439                extractor_id=%self.extractor_id,
440                last_message_at=?self.modify_ts,
441                "SynchronizerStream transition to stale due to timeout."
442            );
443            self.state = SynchronizerState::Stale(header_to_use);
444            self.modify_ts = now;
445            Ok(true)
446        } else {
447            Ok(false)
448        }
449    }
450
451    /// Logic to transition a state synchronizer based on newly received block
452    ///
453    /// Updates the synchronizer's state according to the position of the received block:
454    /// - Next expected block -> Ready state
455    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
456    /// - Advanced block -> Advanced state (block ahead of expected position)
457    fn transition(
458        &mut self,
459        latest_retrieved: BlockHeader,
460        block_history: &BlockHistory,
461        stale_threshold: std::time::Duration,
462    ) -> Result<(), BlockSynchronizerError> {
463        let extractor_id = self.extractor_id.clone();
464        let last_message_at = self.modify_ts;
465        let block = &latest_retrieved;
466
467        match block_history.determine_block_position(&latest_retrieved)? {
468            BlockPosition::NextExpected | BlockPosition::NextPartial => {
469                self.state = SynchronizerState::Ready(latest_retrieved.clone());
470                trace!(
471                    next = %latest_retrieved,
472                    extractor = %extractor_id,
473                    "SynchronizerStream transition to next expected"
474                )
475            }
476            BlockPosition::Latest | BlockPosition::Delayed => {
477                if !self.check_and_transition_to_stale_if_needed(
478                    stale_threshold,
479                    Some(latest_retrieved.clone()),
480                )? {
481                    warn!(
482                        %extractor_id,
483                        ?last_message_at,
484                        %block,
485                        "SynchronizerStream transition transition to delayed."
486                    );
487                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
488                }
489            }
490            BlockPosition::Advanced => {
491                info!(
492                    %extractor_id,
493                    ?last_message_at,
494                    latest = opt(&block_history.latest()),
495                    %block,
496                    "SynchronizerStream transition to advanced."
497                );
498                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
499            }
500        }
501        self.modify_ts = Local::now().naive_utc();
502        Ok(())
503    }
504
505    /// Marks this stream as errored
506    ///
507    /// Sets an error and transitions the stream to Ended, correctly recording the
508    /// time at which this happened.
509    fn mark_errored(&mut self, error: SynchronizerError) {
510        self.state = SynchronizerState::Ended(error.to_string());
511        self.modify_ts = Local::now().naive_utc();
512        self.error = Some(error);
513    }
514
515    /// Marks a stream as closed.
516    ///
517    /// If the stream has not been ended previously, e.g. by an error it will be marked
518    /// as Ended without error. This should not happen since we should stop consuming
519    /// from the stream if an error occured.
520    fn mark_closed(&mut self) {
521        if !matches!(self.state, SynchronizerState::Ended(_)) {
522            self.state = SynchronizerState::Ended("Closed".to_string());
523            self.modify_ts = Local::now().naive_utc();
524        }
525    }
526
527    /// Marks a stream as stale.
528    fn mark_stale(&mut self, header: &BlockHeader) {
529        self.state = SynchronizerState::Stale(header.clone());
530        self.modify_ts = Local::now().naive_utc();
531    }
532
533    /// Marks this stream as ready.
534    fn mark_ready(&mut self, header: &BlockHeader) {
535        self.state = SynchronizerState::Ready(header.clone());
536        self.modify_ts = Local::now().naive_utc();
537    }
538
539    fn has_ended(&self) -> bool {
540        matches!(self.state, SynchronizerState::Ended(_))
541    }
542
543    fn is_stale(&self) -> bool {
544        matches!(self.state, SynchronizerState::Stale(_))
545    }
546
547    fn is_advanced(&self) -> bool {
548        matches!(self.state, SynchronizerState::Advanced(_))
549    }
550
551    /// Gets the streams current header from active streams.
552    ///
553    /// A stream is considered as active unless it has ended or is stale.
554    fn get_current_header(&self) -> Option<&BlockHeader> {
555        match &self.state {
556            SynchronizerState::Ready(b) |
557            SynchronizerState::Delayed(b) |
558            SynchronizerState::Advanced(b) => Some(b),
559            _ => None,
560        }
561    }
562}
563
564#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
565pub struct FeedMessage<H = BlockHeader>
566where
567    H: HeaderLike,
568{
569    pub state_msgs: HashMap<String, StateSyncMessage<H>>,
570    pub sync_states: HashMap<String, SynchronizerState>,
571}
572
573impl<H> FeedMessage<H>
574where
575    H: HeaderLike,
576{
577    fn new(
578        state_msgs: HashMap<String, StateSyncMessage<H>>,
579        sync_states: HashMap<String, SynchronizerState>,
580    ) -> Self {
581        Self { state_msgs, sync_states }
582    }
583}
584
585impl<S> BlockSynchronizer<S>
586where
587    S: StateSynchronizer,
588{
589    pub fn new(
590        block_time: std::time::Duration,
591        latency_buffer: std::time::Duration,
592        max_missed_blocks: u64,
593    ) -> Self {
594        Self {
595            synchronizers: None,
596            max_messages: None,
597            block_time,
598            latency_buffer,
599            startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
600            max_missed_blocks,
601        }
602    }
603
604    /// Limits the stream to emit a maximum number of messages.
605    ///
606    /// After the stream emitted max messages it will end. This is only useful for
607    /// testing purposes or if you only want to process a fixed amount of messages
608    /// and then terminate cleanly.
609    pub fn max_messages(&mut self, val: usize) {
610        self.max_messages = Some(val);
611    }
612
613    /// Sets timeout for the first message of a protocol.
614    ///
615    /// Time to wait for the full first message, including snapshot retrieval.
616    pub fn startup_timeout(mut self, val: Duration) {
617        self.startup_timeout = val;
618    }
619
620    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
621        let mut registered = self.synchronizers.unwrap_or_default();
622        registered.insert(id, synchronizer);
623        self.synchronizers = Some(registered);
624        self
625    }
626
627    #[cfg(test)]
628    pub fn with_short_timeouts() -> Self {
629        Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
630    }
631
632    /// Cleanup function for shutting down remaining synchronizers when the nanny detects an error.
633    /// Sends close signals to all remaining synchronizers and waits for them to complete.
634    async fn cleanup_synchronizers(
635        mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
636        sync_close_senders: Vec<oneshot::Sender<()>>,
637    ) {
638        // Send close signals to all remaining synchronizers
639        for close_sender in sync_close_senders {
640            let _ = close_sender.send(());
641        }
642
643        // Await remaining tasks with timeout
644        let mut completed_tasks = 0;
645        while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
646            completed_tasks += 1;
647        }
648
649        // Warn if any synchronizers timed out during cleanup
650        let remaining_tasks = state_sync_tasks.len();
651        if remaining_tasks > 0 {
652            warn!(
653                completed = completed_tasks,
654                timed_out = remaining_tasks,
655                "Some synchronizers timed out during cleanup and may not have shut down cleanly"
656            );
657        }
658    }
659
660    /// Starts the synchronization of streams.
661    ///
662    /// Will error directly if the startup fails. Once the startup is complete, it will
663    /// communicate any fatal errors through the stream before closing it.
664    pub async fn run(
665        mut self,
666    ) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
667    {
668        trace!("Starting BlockSynchronizer...");
669        let state_sync_tasks = FuturesUnordered::new();
670        let mut synchronizers = self
671            .synchronizers
672            .take()
673            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
674        // init synchronizers
675        let init_tasks = synchronizers
676            .values_mut()
677            .map(|s| s.initialize())
678            .collect::<Vec<_>>();
679        try_join_all(init_tasks).await?;
680
681        let mut sync_streams = Vec::with_capacity(synchronizers.len());
682        let mut sync_close_senders = Vec::new();
683        for (extractor_id, synchronizer) in synchronizers.drain() {
684            let (handle, rx) = synchronizer.start().await;
685            let (join_handle, close_sender) = handle.split();
686            state_sync_tasks.push(join_handle);
687            sync_close_senders.push(close_sender);
688
689            sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
690        }
691
692        // startup, schedule first set of futures and wait for them to return to initialise
693        // synchronizers.
694        debug!("Waiting for initial synchronizer messages...");
695        let mut startup_futures = Vec::new();
696        for synchronizer in sync_streams.iter_mut() {
697            let fut = async {
698                let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
699                (synchronizer, res)
700            };
701            startup_futures.push(fut);
702        }
703
704        let mut ready_sync_msgs = HashMap::new();
705        let initial_headers = join_all(startup_futures)
706            .await
707            .into_iter()
708            .filter_map(|(synchronizer, res)| {
709                let extractor_id = synchronizer.extractor_id.clone();
710                match res {
711                    Ok(Some(Ok(msg))) => {
712                        debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
713                        // initially default all synchronizers to Ready
714                        synchronizer.mark_ready(&msg.header);
715                        let header = msg.header.clone();
716                        ready_sync_msgs.insert(extractor_id.name.clone(), msg);
717                        Some(header)
718                    }
719                    Ok(Some(Err(e))) => {
720                        synchronizer.mark_errored(e);
721                        None
722                    }
723                    Ok(None) => {
724                        // Synchronizer closed channel. This can only happen if the run
725                        // task ended, before this, the synchronizer should have sent
726                        // an error, so this case we likely don't have to handle that
727                        // explicitly
728                        warn!(%extractor_id, "Synchronizer closed during startup");
729                        synchronizer.mark_closed();
730                        None
731                    }
732                    Err(_) => {
733                        // We got an error because the synchronizer timed out during startup
734                        warn!(%extractor_id, "Timed out waiting for first message");
735                        synchronizer.mark_stale(&BlockHeader::default());
736                        None
737                    }
738                }
739            })
740            .collect::<HashSet<_>>() // remove duplicates
741            .into_iter()
742            .collect::<Vec<_>>();
743
744        // Ensures we have at least one ready stream
745        Self::check_streams(&sync_streams)?;
746        let mut block_history = BlockHistory::new(initial_headers, 15)?;
747        // Determine the starting header for synchronization
748        let start_header = block_history
749            .latest()
750            // Safe, as we have checked this invariant with `check_streams`
751            .ok_or(BlockHistoryError::EmptyHistory)?;
752        info!(
753            start_block=%start_header,
754            n_healthy=ready_sync_msgs.len(),
755            n_total=sync_streams.len(),
756            "Block synchronization started successfully!"
757        );
758
759        // Determine correct state for each remaining synchronizer, based on their header vs the
760        // latest one
761        for stream in sync_streams.iter_mut() {
762            if let SynchronizerState::Ready(header) = stream.state.clone() {
763                if header.number < start_header.number {
764                    debug!(
765                        extractor_id=%stream.extractor_id,
766                        synchronizer_block=header.number,
767                        current_block=start_header.number,
768                        "Marking synchronizer as delayed during initialization"
769                    );
770                    stream.state = SynchronizerState::Delayed(header);
771                }
772            }
773        }
774
775        let (sync_tx, sync_rx) = mpsc::channel(30);
776        let main_loop_jh = tokio::spawn(async move {
777            let mut n_iter = 1;
778            loop {
779                // Send retrieved data to receivers.
780                let msg = FeedMessage::new(
781                    std::mem::take(&mut ready_sync_msgs),
782                    sync_streams
783                        .iter()
784                        .map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
785                        .collect(),
786                );
787                if sync_tx.send(Ok(msg)).await.is_err() {
788                    info!("Receiver closed, block synchronizer terminating..");
789                    return;
790                };
791
792                // Check if we have reached the max messages
793                if let Some(max_messages) = self.max_messages {
794                    if n_iter >= max_messages {
795                        info!(max_messages, "StreamEnd");
796                        return;
797                    }
798                }
799                n_iter += 1;
800
801                let res = self
802                    .handle_next_message(
803                        &mut sync_streams,
804                        &mut ready_sync_msgs,
805                        &mut block_history,
806                    )
807                    .await;
808
809                if let Err(e) = res {
810                    // Communicate error to clients, then end the loop
811                    let _ = sync_tx.send(Err(e)).await;
812                    return;
813                }
814            }
815        });
816
817        // We await the main loop and log any panics (should be impossible). If the
818        // main loop exits, all synchronizers should be ended or stale. So we kill any
819        // remaining stale ones just in case. A final error is propagated through the
820        // channel to the user.
821        let nanny_jh = tokio::spawn(async move {
822            // report any panics
823            let _ = main_loop_jh.await.map_err(|e| {
824                if e.is_panic() {
825                    error!("BlockSynchornizer main loop panicked: {e}")
826                }
827            });
828            debug!("Main loop exited. Closing synchronizers");
829            Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
830            debug!("Shutdown complete");
831        });
832        Ok((nanny_jh, sync_rx))
833    }
834
835    /// Retrieves next message from synchronizers
836    ///
837    /// The result is written into `ready_sync_messages`. Errors only if there is a
838    /// non-recoverable error or all synchronizers have ended.
839    async fn handle_next_message(
840        &self,
841        sync_streams: &mut [SynchronizerStream],
842        ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
843        block_history: &mut BlockHistory,
844    ) -> BlockSyncResult<()> {
845        let mut recv_futures = Vec::new();
846        for stream in sync_streams.iter_mut() {
847            // If stream is in ended state, do not check for any messages (it's receiver
848            // is closed), but do check stale streams.
849            if stream.has_ended() {
850                continue;
851            }
852            // Here we simply wait block_time + max_wait. This will not work for chains with
853            // unknown block times but is simple enough for now.
854            // If we would like to support unknown block times we could: Instruct all handles to
855            // await the max block time, if a header arrives within that time transition as
856            // usual, but via a select statement get notified (using e.g. Notify) if any other
857            // handle finishes before the timeout. Then await again but this time only for
858            // max_wait and then proceed as usual. So basically each try_advance task would have
859            // a select statement that allows it to exit the first timeout preemptively if any
860            // other try_advance task finished earlier.
861            recv_futures.push(async {
862                let res = stream
863                    .try_advance(
864                        block_history,
865                        self.block_time,
866                        self.latency_buffer,
867                        self.block_time
868                            .mul_f64(self.max_missed_blocks as f64),
869                    )
870                    .await?;
871                Ok::<_, BlockSynchronizerError>(
872                    res.map(|msg| (stream.extractor_id.name.clone(), msg)),
873                )
874            });
875        }
876        ready_sync_msgs.extend(
877            join_all(recv_futures)
878                .await
879                .into_iter()
880                .collect::<Result<Vec<_>, _>>()?
881                .into_iter()
882                .flatten(),
883        );
884
885        // Check if we have any active synchronizers (Ready, Delayed, or Advanced)
886        // If all synchronizers have been purged (Stale/Ended), exit the main loop
887        Self::check_streams(sync_streams)?;
888
889        // if we have any advanced header, we reinit the block history,
890        // else we simply advance the existing history
891        if sync_streams
892            .iter()
893            .any(SynchronizerStream::is_advanced)
894        {
895            *block_history = Self::reinit_block_history(sync_streams, block_history)?;
896        } else {
897            let header = sync_streams
898                .iter()
899                .filter_map(SynchronizerStream::get_current_header)
900                .max_by_key(|b| b.number)
901                // Safe, as we have checked this invariant with `check_streams`
902                .ok_or(BlockSynchronizerError::NoReadySynchronizers(
903                    "Expected to have at least one synchronizer that is not stale or ended"
904                        .to_string(),
905                ))?;
906            block_history.push(header.clone())?;
907        }
908        Ok(())
909    }
910
911    /// Reinitialise block history and reclassifies active synchronizers states.
912    ///
913    /// We call this if we detect a future detached block. This usually only happens if
914    /// a synchronizer has a restart.
915    ///
916    /// ## Note
917    /// This method assumes that at least one synchronizer is in Advanced, Ready or
918    /// Delayed state, it will return an error in case this is not the case.
919    fn reinit_block_history(
920        sync_streams: &mut [SynchronizerStream],
921        block_history: &mut BlockHistory,
922    ) -> Result<BlockHistory, BlockSynchronizerError> {
923        let previous = block_history
924            .latest()
925            // Old block history should not be empty, startup should have populated it at this point
926            .ok_or(BlockHistoryError::EmptyHistory)?;
927        let blocks = sync_streams
928            .iter()
929            .filter_map(SynchronizerStream::get_current_header)
930            .cloned()
931            .collect();
932        let new_block_history = BlockHistory::new(blocks, 10)?;
933        let latest = new_block_history
934            .latest()
935            // Block history should not be empty, we just populated it.
936            .ok_or(BlockHistoryError::EmptyHistory)?;
937        info!(
938             %previous,
939            %latest,
940            "Advanced synchronizer detected. Reinitialized block history."
941        );
942        sync_streams
943            .iter_mut()
944            .for_each(|stream| {
945                // we only get headers from advanced, ready and delayed so stale
946                // or ended streams are not considered here
947                if let Some(header) = stream.get_current_header() {
948                    if header.number < latest.number {
949                        stream.state = SynchronizerState::Delayed(header.clone());
950                    } else if header.number == latest.number {
951                        stream.state = SynchronizerState::Ready(header.clone());
952                    }
953                }
954            });
955        Ok(new_block_history)
956    }
957
958    /// Checks if we still have at least one active stream else errors.
959    ///
960    /// If there are no active streams meaning all are ended or stale, it returns a
961    /// summary error message for the state of all synchronizers.
962    fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
963        let mut latest_errored_stream: Option<&SynchronizerStream> = None;
964
965        for stream in sync_streams.iter() {
966            // If we have at least one active stream, we can return early
967            if !stream.has_ended() && !stream.is_stale() {
968                return Ok(());
969            }
970
971            // Otherwise, we track the latest errored stream for reporting
972            if latest_errored_stream.is_none() ||
973                stream.modify_ts >
974                    latest_errored_stream
975                        .as_ref()
976                        .unwrap()
977                        .modify_ts
978            {
979                latest_errored_stream = Some(stream);
980            }
981        }
982
983        let last_error_reason = if let Some(stream) = latest_errored_stream {
984            if let Some(err) = &stream.error {
985                format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
986            } else {
987                format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
988            }
989        } else {
990            return Err(BlockSynchronizerError::NoSynchronizers);
991        };
992
993        let mut reason = vec![last_error_reason];
994
995        sync_streams.iter().for_each(|stream| {
996            reason.push(format!(
997                "{} reported as {} at {}",
998                stream.extractor_id, stream.state, stream.modify_ts
999            ))
1000        });
1001
1002        Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
1003    }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use std::sync::Arc;
1009
1010    use async_trait::async_trait;
1011    use test_log::test;
1012    use tokio::sync::{oneshot, Mutex};
1013    use tycho_common::dto::Chain;
1014
1015    use super::*;
1016    use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
1017
1018    #[derive(Clone, Debug)]
1019    enum MockBehavior {
1020        Normal,          // Exit successfully when receiving close signal
1021        IgnoreClose,     // Ignore close signals and hang (for timeout testing)
1022        ExitImmediately, // Exit immediately after first message (for quick failure testing)
1023    }
1024
1025    type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
1026
1027    #[derive(Clone)]
1028    struct MockStateSync {
1029        header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
1030        header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
1031        close_received: Arc<Mutex<bool>>,
1032        behavior: MockBehavior,
1033        // For testing: store the close sender so tests can trigger close signals
1034        close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
1035    }
1036
1037    impl MockStateSync {
1038        fn new() -> Self {
1039            Self::with_behavior(MockBehavior::Normal)
1040        }
1041
1042        fn with_behavior(behavior: MockBehavior) -> Self {
1043            let (tx, rx) = mpsc::channel(1);
1044            Self {
1045                header_tx: tx,
1046                header_rx: Arc::new(Mutex::new(Some(rx))),
1047                close_received: Arc::new(Mutex::new(false)),
1048                behavior,
1049                close_tx: Arc::new(Mutex::new(None)),
1050            }
1051        }
1052
1053        async fn was_close_received(&self) -> bool {
1054            *self.close_received.lock().await
1055        }
1056
1057        async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
1058            self.header_tx
1059                .send(Ok(header))
1060                .await
1061                .map_err(|e| format!("sending header failed: {e}"))
1062        }
1063
1064        // For testing: trigger a close signal to make the synchronizer exit
1065        async fn trigger_close(&self) {
1066            if let Some(close_tx) = self.close_tx.lock().await.take() {
1067                let _ = close_tx.send(());
1068            }
1069        }
1070    }
1071
1072    #[async_trait]
1073    impl StateSynchronizer for MockStateSync {
1074        async fn initialize(&mut self) -> SyncResult<()> {
1075            Ok(())
1076        }
1077
1078        async fn start(
1079            mut self,
1080        ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1081            let block_rx = {
1082                let mut guard = self.header_rx.lock().await;
1083                guard
1084                    .take()
1085                    .expect("Block receiver was not set!")
1086            };
1087
1088            // Create close channel - we need to store one sender for testing and give one to the
1089            // handle
1090            let (close_tx_for_handle, close_rx) = oneshot::channel();
1091            let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
1092
1093            // Store the test close sender
1094            {
1095                let mut guard = self.close_tx.lock().await;
1096                *guard = Some(close_tx_for_test);
1097            }
1098
1099            let behavior = self.behavior.clone();
1100            let close_received_clone = self.close_received.clone();
1101            let tx = self.header_tx.clone();
1102
1103            let jh = tokio::spawn(async move {
1104                match behavior {
1105                    MockBehavior::IgnoreClose => {
1106                        // Infinite loop to simulate a hung synchronizer that doesn't respond to
1107                        // close signals
1108                        loop {
1109                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1110                        }
1111                    }
1112                    MockBehavior::ExitImmediately => {
1113                        // Exit immediately with error to simulate immediate task failure
1114                        tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
1115                            "Simulated immediate task failure".to_string(),
1116                        )))
1117                        .await
1118                        .unwrap();
1119                    }
1120                    MockBehavior::Normal => {
1121                        // Wait for close signal from either handle or test, then respond based on
1122                        // behavior
1123                        let _ = tokio::select! {
1124                            result = close_rx => result,
1125                            result = close_rx_for_test => result,
1126                        };
1127                        let mut guard = close_received_clone.lock().await;
1128                        *guard = true;
1129                    }
1130                }
1131            });
1132
1133            let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
1134            (handle, block_rx)
1135        }
1136    }
1137
1138    fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1139        StateSyncMessage {
1140            header: BlockHeader {
1141                number: block as u64,
1142                hash: Bytes::from(vec![block]),
1143                parent_hash: Bytes::from(vec![block - 1]),
1144                revert: false,
1145                timestamp: 1000,
1146                partial_block_index: None,
1147            },
1148            ..Default::default()
1149        }
1150    }
1151
1152    /// Creates a partial block header message with an ephemeral hash encoding block number and
1153    /// partial index.
1154    fn partial_header_message(block: u8, partial_idx: u32) -> StateSyncMessage<BlockHeader> {
1155        // Ephemeral hash encodes block number and partial index for uniqueness
1156        let hash_bytes =
1157            [(block as u64).to_be_bytes().as_slice(), partial_idx.to_be_bytes().as_slice()]
1158                .concat();
1159        StateSyncMessage {
1160            header: BlockHeader {
1161                number: block as u64,
1162                hash: Bytes::from(hash_bytes),
1163                parent_hash: Bytes::from(vec![block - 1]),
1164                revert: false,
1165                timestamp: 1000,
1166                partial_block_index: Some(partial_idx),
1167            },
1168            ..Default::default()
1169        }
1170    }
1171
1172    fn revert_header_message(block: u8) -> StateSyncMessage<BlockHeader> {
1173        StateSyncMessage {
1174            header: BlockHeader {
1175                number: block as u64,
1176                hash: Bytes::from(vec![block]),
1177                parent_hash: Bytes::from(vec![block - 1]),
1178                revert: true,
1179                timestamp: 1000,
1180                partial_block_index: None,
1181            },
1182            ..Default::default()
1183        }
1184    }
1185
1186    async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
1187        timeout(Duration::from_millis(100), rx.recv())
1188            .await
1189            .expect("Responds in time")
1190            .expect("Should receive first message")
1191            .expect("No error")
1192    }
1193
1194    async fn setup_block_sync(
1195    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1196    {
1197        setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
1198    }
1199
1200    // Starts up a synchronizer and consumes the first message on block 1.
1201    async fn setup_block_sync_with_behaviour(
1202        v2_behavior: MockBehavior,
1203        v3_behavior: MockBehavior,
1204    ) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
1205    {
1206        let v2_sync = MockStateSync::with_behavior(v2_behavior);
1207        let v3_sync = MockStateSync::with_behavior(v3_behavior);
1208
1209        // Use reasonable timeouts to observe proper state transitions
1210        let mut block_sync = BlockSynchronizer::new(
1211            Duration::from_millis(20), // block_time
1212            Duration::from_millis(10), // max_wait
1213            3,                         // max_missed_blocks (stale threshold = 20ms * 3 = 60ms)
1214        );
1215        block_sync.max_messages(10); // Allow enough messages to see the progression
1216
1217        let block_sync = block_sync
1218            .register_synchronizer(
1219                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1220                v2_sync.clone(),
1221            )
1222            .register_synchronizer(
1223                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1224                v3_sync.clone(),
1225            );
1226
1227        // Send initial messages to both synchronizers
1228        let block1_msg = header_message(1);
1229        let _ = v2_sync
1230            .send_header(block1_msg.clone())
1231            .await;
1232        let _ = v3_sync
1233            .send_header(block1_msg.clone())
1234            .await;
1235
1236        // Start the block synchronizer
1237        let (nanny_handle, mut rx) = block_sync
1238            .run()
1239            .await
1240            .expect("BlockSynchronizer failed to start");
1241
1242        let first_feed_msg = receive_message(&mut rx).await;
1243        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1244        assert!(matches!(
1245            first_feed_msg
1246                .sync_states
1247                .get("uniswap-v2")
1248                .unwrap(),
1249            SynchronizerState::Ready(_)
1250        ));
1251        assert!(matches!(
1252            first_feed_msg
1253                .sync_states
1254                .get("uniswap-v3")
1255                .unwrap(),
1256            SynchronizerState::Ready(_)
1257        ));
1258
1259        (v2_sync, v3_sync, nanny_handle, rx)
1260    }
1261
1262    async fn shutdown_block_synchronizer(
1263        v2_sync: &MockStateSync,
1264        v3_sync: &MockStateSync,
1265        nanny_handle: JoinHandle<()>,
1266    ) {
1267        v3_sync.trigger_close().await;
1268        v2_sync.trigger_close().await;
1269
1270        timeout(Duration::from_millis(100), nanny_handle)
1271            .await
1272            .expect("Nanny failed to exit within time")
1273            .expect("Nanny panicked");
1274    }
1275
1276    /// Send message to sync and assert it transitions to Ready at expected block/partial
1277    async fn send_and_assert_ready(
1278        sync: &MockStateSync,
1279        sync_name: &str,
1280        rx: &mut Receiver<BlockSyncResult<FeedMessage>>,
1281        msg: StateSyncMessage<BlockHeader>,
1282        expected_block: u64,
1283        expected_partial: Option<u32>,
1284    ) {
1285        sync.send_header(msg)
1286            .await
1287            .expect("send failed");
1288        let feed_msg = receive_message(rx).await;
1289        let state = feed_msg
1290            .sync_states
1291            .get(sync_name)
1292            .unwrap();
1293        match state {
1294            SynchronizerState::Ready(h) => {
1295                assert_eq!(h.number, expected_block, "wrong block number");
1296                assert_eq!(h.partial_block_index, expected_partial, "wrong partial index");
1297            }
1298            other => panic!("expected Ready, got {:?}", other),
1299        }
1300    }
1301
1302    #[test(tokio::test)]
1303    async fn test_two_ready_synchronizers() {
1304        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1305
1306        let second_msg = header_message(2);
1307        v2_sync
1308            .send_header(second_msg.clone())
1309            .await
1310            .expect("send_header failed");
1311        v3_sync
1312            .send_header(second_msg.clone())
1313            .await
1314            .expect("send_header failed");
1315        let second_feed_msg = receive_message(&mut rx).await;
1316
1317        let exp2 = FeedMessage {
1318            state_msgs: [
1319                ("uniswap-v2".to_string(), second_msg.clone()),
1320                ("uniswap-v3".to_string(), second_msg.clone()),
1321            ]
1322            .into_iter()
1323            .collect(),
1324            sync_states: [
1325                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1326                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
1327            ]
1328            .into_iter()
1329            .collect(),
1330        };
1331        assert_eq!(second_feed_msg, exp2);
1332
1333        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1334    }
1335
1336    #[test(tokio::test)]
1337    async fn test_delayed_synchronizer_catches_up() {
1338        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1339
1340        // Send block 2 to v2 synchronizer only
1341        let block2_msg = header_message(2);
1342        v2_sync
1343            .send_header(block2_msg.clone())
1344            .await
1345            .expect("send_header failed");
1346
1347        // Consume second message - v3 should be delayed
1348        let second_feed_msg = receive_message(&mut rx).await;
1349        debug!("Consumed second message for v2");
1350
1351        assert!(second_feed_msg
1352            .state_msgs
1353            .contains_key("uniswap-v2"));
1354        assert!(matches!(
1355            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1356            SynchronizerState::Ready(header) if header.number == 2
1357        ));
1358        assert!(!second_feed_msg
1359            .state_msgs
1360            .contains_key("uniswap-v3"));
1361        assert!(matches!(
1362            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1363            SynchronizerState::Delayed(header) if header.number == 1
1364        ));
1365
1366        // Now v3 catches up to block 2
1367        v3_sync
1368            .send_header(block2_msg.clone())
1369            .await
1370            .expect("send_header failed");
1371
1372        // Both advance to block 3
1373        let block3_msg = header_message(3);
1374        v2_sync
1375            .send_header(block3_msg.clone())
1376            .await
1377            .expect("send_header failed");
1378        v3_sync
1379            .send_header(block3_msg)
1380            .await
1381            .expect("send_header failed");
1382
1383        // Consume messages until we get both synchronizers on block 3
1384        // We may get an intermediate message for v3's catch-up or a combined message
1385        let mut third_feed_msg = receive_message(&mut rx).await;
1386
1387        // If this message doesn't have both univ2, it's an intermediate message, so we get the next
1388        // one
1389        if !third_feed_msg
1390            .state_msgs
1391            .contains_key("uniswap-v2")
1392        {
1393            third_feed_msg = rx
1394                .recv()
1395                .await
1396                .expect("header channel was closed")
1397                .expect("no error");
1398        }
1399        assert!(third_feed_msg
1400            .state_msgs
1401            .contains_key("uniswap-v2"));
1402        assert!(third_feed_msg
1403            .state_msgs
1404            .contains_key("uniswap-v3"));
1405        assert!(matches!(
1406            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1407            SynchronizerState::Ready(header) if header.number == 3
1408        ));
1409        assert!(matches!(
1410            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1411            SynchronizerState::Ready(header) if header.number == 3
1412        ));
1413
1414        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1415    }
1416
1417    #[test(tokio::test)]
1418    async fn test_different_start_blocks() {
1419        let v2_sync = MockStateSync::new();
1420        let v3_sync = MockStateSync::new();
1421        let block_sync = BlockSynchronizer::with_short_timeouts()
1422            .register_synchronizer(
1423                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1424                v2_sync.clone(),
1425            )
1426            .register_synchronizer(
1427                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1428                v3_sync.clone(),
1429            );
1430
1431        // Initial messages - synchronizers at different blocks
1432        let block1_msg = header_message(1);
1433        let block2_msg = header_message(2);
1434
1435        let _ = v2_sync
1436            .send_header(block1_msg.clone())
1437            .await;
1438        v3_sync
1439            .send_header(block2_msg.clone())
1440            .await
1441            .expect("send_header failed");
1442
1443        // Start the block synchronizer - it should use block 2 as the starting block
1444        let (jh, mut rx) = block_sync
1445            .run()
1446            .await
1447            .expect("BlockSynchronizer failed to start.");
1448
1449        // Consume first message
1450        let first_feed_msg = receive_message(&mut rx).await;
1451        assert!(matches!(
1452            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1453            SynchronizerState::Delayed(header) if header.number == 1
1454        ));
1455        assert!(matches!(
1456            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1457            SynchronizerState::Ready(header) if header.number == 2
1458        ));
1459
1460        // Now v2 catches up to block 2
1461        v2_sync
1462            .send_header(block2_msg.clone())
1463            .await
1464            .expect("send_header failed");
1465
1466        // Both advance to block 3
1467        let block3_msg = header_message(3);
1468        let _ = v2_sync
1469            .send_header(block3_msg.clone())
1470            .await;
1471        v3_sync
1472            .send_header(block3_msg.clone())
1473            .await
1474            .expect("send_header failed");
1475
1476        // Consume third message - both should be on block 3
1477        let second_feed_msg = receive_message(&mut rx).await;
1478        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1479        assert!(matches!(
1480            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1481            SynchronizerState::Ready(header) if header.number == 3
1482        ));
1483        assert!(matches!(
1484            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1485            SynchronizerState::Ready(header) if header.number == 3
1486        ));
1487
1488        shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
1489    }
1490
1491    #[test(tokio::test)]
1492    async fn test_synchronizer_fails_other_goes_stale() {
1493        let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
1494            setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
1495                .await;
1496
1497        let mut error_reported = false;
1498        for _ in 0..3 {
1499            if let Some(msg) = sync_rx.recv().await {
1500                match msg {
1501                    Err(_) => error_reported = true,
1502                    Ok(msg) => {
1503                        assert!(matches!(
1504                            msg.sync_states
1505                                .get("uniswap-v3")
1506                                .unwrap(),
1507                            SynchronizerState::Delayed(_)
1508                        ));
1509                        assert!(matches!(
1510                            msg.sync_states
1511                                .get("uniswap-v2")
1512                                .unwrap(),
1513                            SynchronizerState::Ended(_)
1514                        ));
1515                    }
1516                }
1517            }
1518        }
1519        assert!(error_reported, "BlockSynchronizer did not report final error");
1520
1521        // Wait for nanny to detect task failure and execute cleanup
1522        let result = timeout(Duration::from_secs(2), nanny_handle).await;
1523        assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1524
1525        // Verify that the remaining synchronizer received close signal during cleanup
1526        assert!(
1527            v3_sync.was_close_received().await,
1528            "v3_sync should have received close signal during cleanup"
1529        );
1530    }
1531
1532    #[test(tokio::test)]
1533    async fn test_cleanup_timeout_warning() {
1534        // Verify that cleanup_synchronizers emits a warning when synchronizers timeout during
1535        // cleanup
1536        let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
1537            MockBehavior::ExitImmediately,
1538            MockBehavior::IgnoreClose,
1539        )
1540        .await;
1541
1542        // Wait for nanny to complete - cleanup should timeout on v3_sync but still complete
1543        let result = timeout(Duration::from_secs(10), nanny_handle).await;
1544        assert!(
1545            result.is_ok(),
1546            "Nanny should complete even when some synchronizers timeout during cleanup"
1547        );
1548
1549        // Note: In a real test environment, we would capture log output to verify the warning was
1550        // emitted. Since this is a unit test without log capture setup, we just verify that
1551        // cleanup completes even when some synchronizers timeout.
1552    }
1553
1554    #[test(tokio::test)]
1555    async fn test_one_synchronizer_goes_stale_while_other_works() {
1556        // Test Case 1: One protocol goes stale and is removed while another protocol works normally
1557        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1558
1559        // Send block 2 only to v3, v2 will timeout and become delayed
1560        let block2_msg = header_message(2);
1561        let _ = v3_sync
1562            .send_header(block2_msg.clone())
1563            .await;
1564        // Don't send to v2_sync - it will timeout
1565
1566        // Consume second message - v2 should be delayed, v3 ready
1567        let second_feed_msg = receive_message(&mut rx).await;
1568        assert!(second_feed_msg
1569            .state_msgs
1570            .contains_key("uniswap-v3"));
1571        assert!(!second_feed_msg
1572            .state_msgs
1573            .contains_key("uniswap-v2"));
1574        assert!(matches!(
1575            second_feed_msg
1576                .sync_states
1577                .get("uniswap-v3")
1578                .unwrap(),
1579            SynchronizerState::Ready(_)
1580        ));
1581        // v2 should be delayed (if still present) - check nanny is still running
1582        if let Some(v2_state) = second_feed_msg
1583            .sync_states
1584            .get("uniswap-v2")
1585        {
1586            if matches!(v2_state, SynchronizerState::Delayed(_)) {
1587                // Verify nanny is still running when synchronizer is just delayed
1588                assert!(
1589                    !nanny_handle.is_finished(),
1590                    "Nanny should still be running when synchronizer is delayed (not stale yet)"
1591                );
1592            }
1593        }
1594
1595        // Wait a bit, then continue sending blocks to v3 but not v2
1596        tokio::time::sleep(Duration::from_millis(15)).await;
1597
1598        // Continue sending blocks only to v3 to keep it healthy while v2 goes stale
1599        let block3_msg = header_message(3);
1600        let _ = v3_sync
1601            .send_header(block3_msg.clone())
1602            .await;
1603
1604        tokio::time::sleep(Duration::from_millis(40)).await;
1605
1606        let mut stale_found = false;
1607        for _ in 0..2 {
1608            if let Some(Ok(msg)) = rx.recv().await {
1609                if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
1610                    stale_found = true;
1611                }
1612            }
1613        }
1614        assert!(stale_found, "v2 synchronizer should be stale");
1615
1616        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1617    }
1618
1619    #[test(tokio::test)]
1620    async fn test_all_synchronizers_go_stale_main_loop_exits() {
1621        // Test Case 2: All protocols go stale and main loop exits gracefully
1622        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1623
1624        // Stop sending messages to both synchronizers - they should both timeout and go stale
1625        // Don't send any more messages, let them timeout and become delayed, then stale
1626
1627        // Monitor the state transitions to ensure proper delayed -> stale progression
1628        let mut seen_delayed = false;
1629
1630        // Consume messages and track state transitions
1631        // Give enough time for the synchronizers to transition through states
1632        let timeout_duration = Duration::from_millis(500); // Generous timeout
1633        let start_time = tokio::time::Instant::now();
1634
1635        while let Ok(Some(Ok(msg))) =
1636            tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
1637        {
1638            // Track when synchronizers transition to delayed
1639            if !seen_delayed {
1640                let v2_state = msg.sync_states.get("uniswap-v2");
1641                let v3_state = msg.sync_states.get("uniswap-v3");
1642
1643                if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1644                    matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1645                {
1646                    seen_delayed = true;
1647                    // Verify nanny is still running when synchronizers are just delayed
1648                    assert!(!nanny_handle.is_finished(),
1649                        "Nanny should still be running when synchronizers are delayed (not stale yet)");
1650                    // Once we've seen delayed and verified nanny is running, we can break
1651                    break;
1652                }
1653            }
1654
1655            // Safety timeout to avoid infinite loop
1656            if start_time.elapsed() > timeout_duration {
1657                break;
1658            }
1659        }
1660        // Verify that synchronizers went through proper state transitions
1661        assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1662
1663        let mut error_reported = false;
1664        // Consume any remaining messages until channel closes
1665        while let Some(msg) = rx.recv().await {
1666            if let Err(e) = msg {
1667                assert!(e
1668                    .to_string()
1669                    .contains("became: Stale(1)"));
1670                assert!(e
1671                    .to_string()
1672                    .contains("reported as Stale(1)"));
1673                error_reported = true;
1674            }
1675        }
1676        assert!(error_reported, "Expected the channel to report an error before closing");
1677
1678        // The nanny should complete when the main loop exits due to no ready synchronizers
1679        let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1680        assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1681
1682        // Verify cleanup was triggered for both synchronizers
1683        assert!(
1684            v2_sync.was_close_received().await,
1685            "v2_sync should have received close signal during cleanup"
1686        );
1687        assert!(
1688            v3_sync.was_close_received().await,
1689            "v3_sync should have received close signal during cleanup"
1690        );
1691    }
1692
1693    #[test(tokio::test)]
1694    async fn test_stale_synchronizer_recovers() {
1695        // Test Case 2: All protocols go stale and main loop exits gracefully
1696        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1697
1698        // Send second messages to v2 only, shortly before both would go stale
1699        tokio::time::sleep(Duration::from_millis(50)).await;
1700        let block2_msg = header_message(2);
1701        let _ = v2_sync
1702            .send_header(block2_msg.clone())
1703            .await;
1704
1705        // we should get two messages here
1706        for _ in 0..2 {
1707            if let Some(msg) = rx.recv().await {
1708                if let Ok(msg) = msg {
1709                    if matches!(
1710                        msg.sync_states
1711                            .get("uniswap-v2")
1712                            .unwrap(),
1713                        SynchronizerState::Ready(_)
1714                    ) {
1715                        assert!(matches!(
1716                            msg.sync_states
1717                                .get("uniswap-v3")
1718                                .unwrap(),
1719                            SynchronizerState::Delayed(_)
1720                        ));
1721                        break;
1722                    };
1723                }
1724            } else {
1725                panic!("Channel closed unexpectedly")
1726            }
1727        }
1728
1729        // Now v3 should be stale
1730        tokio::time::sleep(Duration::from_millis(15)).await;
1731        let block3_msg = header_message(3);
1732        let _ = v2_sync
1733            .send_header(block3_msg.clone())
1734            .await;
1735        let third_msg = receive_message(&mut rx).await;
1736        dbg!(&third_msg);
1737        assert!(matches!(
1738            third_msg
1739                .sync_states
1740                .get("uniswap-v2")
1741                .unwrap(),
1742            SynchronizerState::Ready(_)
1743        ));
1744        assert!(matches!(
1745            third_msg
1746                .sync_states
1747                .get("uniswap-v3")
1748                .unwrap(),
1749            SynchronizerState::Stale(_)
1750        ));
1751
1752        let block4_msg = header_message(4);
1753        let _ = v3_sync
1754            .send_header(block2_msg.clone())
1755            .await;
1756        let _ = v3_sync
1757            .send_header(block3_msg.clone())
1758            .await;
1759        let _ = v3_sync
1760            .send_header(block4_msg.clone())
1761            .await;
1762        let _ = v2_sync
1763            .send_header(block4_msg.clone())
1764            .await;
1765        let fourth_msg = receive_message(&mut rx).await;
1766        assert!(matches!(
1767            fourth_msg
1768                .sync_states
1769                .get("uniswap-v2")
1770                .unwrap(),
1771            SynchronizerState::Ready(_)
1772        ));
1773        assert!(matches!(
1774            fourth_msg
1775                .sync_states
1776                .get("uniswap-v3")
1777                .unwrap(),
1778            SynchronizerState::Ready(_)
1779        ));
1780
1781        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1782
1783        // Verify cleanup was triggered for both synchronizers
1784        assert!(
1785            v2_sync.was_close_received().await,
1786            "v2_sync should have received close signal during cleanup"
1787        );
1788        assert!(
1789            v3_sync.was_close_received().await,
1790            "v3_sync should have received close signal during cleanup"
1791        );
1792    }
1793
1794    #[test(tokio::test)]
1795    async fn test_all_synchronizer_advanced() {
1796        // Test the case were all synchronizers successfully recover but stream
1797        // from a disconnected future block.
1798
1799        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1800
1801        let block3 = header_message(3);
1802        v2_sync
1803            .send_header(block3.clone())
1804            .await
1805            .unwrap();
1806        v3_sync
1807            .send_header(block3)
1808            .await
1809            .unwrap();
1810
1811        let msg = receive_message(&mut rx).await;
1812        matches!(
1813            msg.sync_states
1814                .get("uniswap-v2")
1815                .unwrap(),
1816            SynchronizerState::Ready(_)
1817        );
1818        matches!(
1819            msg.sync_states
1820                .get("uniswap-v3")
1821                .unwrap(),
1822            SynchronizerState::Ready(_)
1823        );
1824
1825        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1826    }
1827
1828    #[test(tokio::test)]
1829    async fn test_one_synchronizer_advanced() {
1830        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1831
1832        let block2 = header_message(2);
1833        let block4 = header_message(4);
1834        v2_sync
1835            .send_header(block4.clone())
1836            .await
1837            .unwrap();
1838        v3_sync
1839            .send_header(block2.clone())
1840            .await
1841            .unwrap();
1842
1843        let msg = receive_message(&mut rx).await;
1844        matches!(
1845            msg.sync_states
1846                .get("uniswap-v2")
1847                .unwrap(),
1848            SynchronizerState::Ready(_)
1849        );
1850        matches!(
1851            msg.sync_states
1852                .get("uniswap-v3")
1853                .unwrap(),
1854            SynchronizerState::Delayed(_)
1855        );
1856
1857        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1858    }
1859
1860    #[test(tokio::test)]
1861    async fn test_partial_blocks_normal_operation() {
1862        // Normal operation: partials with arbitrary index increments, then advance to next block
1863        // Scenario: block 1 → partials 0,3,7 for block 2 → partials 0,2 for block 3
1864        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1865
1866        // Partials for block 2 with arbitrary increments (only v2, v3 ignored)
1867        send_and_assert_ready(
1868            &v2_sync,
1869            "uniswap-v2",
1870            &mut rx,
1871            partial_header_message(2, 0),
1872            2,
1873            Some(0),
1874        )
1875        .await;
1876        send_and_assert_ready(
1877            &v2_sync,
1878            "uniswap-v2",
1879            &mut rx,
1880            partial_header_message(2, 3),
1881            2,
1882            Some(3),
1883        )
1884        .await;
1885        send_and_assert_ready(
1886            &v2_sync,
1887            "uniswap-v2",
1888            &mut rx,
1889            partial_header_message(2, 7),
1890            2,
1891            Some(7),
1892        )
1893        .await;
1894
1895        // Advance to block 3 with partials
1896        send_and_assert_ready(
1897            &v2_sync,
1898            "uniswap-v2",
1899            &mut rx,
1900            partial_header_message(3, 0),
1901            3,
1902            Some(0),
1903        )
1904        .await;
1905        send_and_assert_ready(
1906            &v2_sync,
1907            "uniswap-v2",
1908            &mut rx,
1909            partial_header_message(3, 2),
1910            3,
1911            Some(2),
1912        )
1913        .await;
1914
1915        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1916    }
1917
1918    #[test(tokio::test)]
1919    async fn test_partial_blocks_handles_reverts() {
1920        // Revert resets state and allows continuation on new fork with full blocks
1921        // Scenario: block 1 → block 2 → partials for block 3 → revert to 2 → full block 3
1922        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1923
1924        // Advance to full block 2 (only v2, v3 ignored)
1925        send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(2), 2, None).await;
1926
1927        // Partials for block 3
1928        send_and_assert_ready(
1929            &v2_sync,
1930            "uniswap-v2",
1931            &mut rx,
1932            partial_header_message(3, 0),
1933            3,
1934            Some(0),
1935        )
1936        .await;
1937        send_and_assert_ready(
1938            &v2_sync,
1939            "uniswap-v2",
1940            &mut rx,
1941            partial_header_message(3, 2),
1942            3,
1943            Some(2),
1944        )
1945        .await;
1946
1947        // Revert to block 2
1948        send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, revert_header_message(2), 2, None)
1949            .await;
1950
1951        // New fork: full block 3
1952        send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(3), 3, None).await;
1953
1954        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
1955    }
1956
1957    #[test(tokio::test)]
1958    async fn test_partial_blocks_delayed_synchronizer_catches_up() {
1959        // Delayed synchronizer catches up when receiving partial blocks
1960        // Scenario: v2 receives partials while v3 is delayed, then v3 catches up
1961        let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
1962
1963        // v2 receives partial 0 for block 2; v3 times out and becomes Delayed
1964        let partial_0 = partial_header_message(2, 0);
1965        v2_sync
1966            .send_header(partial_0.clone())
1967            .await
1968            .expect("send partial 0 failed");
1969
1970        let msg = receive_message(&mut rx).await;
1971        // v2 is Ready with partial 0, v3 is Delayed
1972        assert!(msg
1973            .state_msgs
1974            .contains_key("uniswap-v2"));
1975        assert!(!msg
1976            .state_msgs
1977            .contains_key("uniswap-v3"));
1978        assert!(matches!(
1979            msg.sync_states.get("uniswap-v2").unwrap(),
1980            SynchronizerState::Ready(h) if h.partial_block_index == Some(0)
1981        ));
1982        assert!(matches!(
1983            msg.sync_states
1984                .get("uniswap-v3")
1985                .unwrap(),
1986            SynchronizerState::Delayed(_)
1987        ));
1988
1989        // v2 advances to partial 2; v3 catches up by sending partial 0 then partial 2
1990        let partial_2 = partial_header_message(2, 2);
1991        v2_sync
1992            .send_header(partial_2.clone())
1993            .await
1994            .expect("send partial 2 failed");
1995        v3_sync
1996            .send_header(partial_0.clone())
1997            .await
1998            .expect("v3 catch up partial 0 failed");
1999        v3_sync
2000            .send_header(partial_2.clone())
2001            .await
2002            .expect("v3 catch up partial 2 failed");
2003
2004        // v3 catches up within a few message cycles
2005        let mut v3_ready = false;
2006        for _ in 0..3 {
2007            let msg = receive_message(&mut rx).await;
2008            if matches!(
2009                msg.sync_states.get("uniswap-v3").unwrap(),
2010                SynchronizerState::Ready(h) if h.partial_block_index == Some(2)
2011            ) {
2012                v3_ready = true;
2013                break;
2014            }
2015        }
2016        assert!(v3_ready, "v3 caught up to partial 2");
2017
2018        shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
2019    }
2020}