tycho_client/feed/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    time::Duration,
4};
5
6use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
7use futures03::{
8    future::{join_all, try_join_all},
9    stream::FuturesUnordered,
10    StreamExt,
11};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::{
15    select,
16    sync::{
17        mpsc::{self, Receiver},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25    dto::{Block, ExtractorIdentity},
26    Bytes,
27};
28
29use crate::feed::{
30    block_history::{BlockHistory, BlockHistoryError, BlockPosition},
31    synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
32};
33
34mod block_history;
35pub mod component_tracker;
36pub mod synchronizer;
37
38/// A trait representing a minimal interface for types that behave like a block header.
39///
40/// This abstraction allows working with either full block headers (`BlockHeader`)
41/// or simplified structures that only provide a timestamp (e.g., for RFQ logic).
42pub trait HeaderLike {
43    fn block(self) -> Option<BlockHeader>;
44    fn block_number_or_timestamp(self) -> u64;
45}
46
47#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
48pub struct BlockHeader {
49    pub hash: Bytes,
50    pub number: u64,
51    pub parent_hash: Bytes,
52    pub revert: bool,
53    pub timestamp: u64,
54}
55
56impl BlockHeader {
57    fn from_block(block: &Block, revert: bool) -> Self {
58        Self {
59            hash: block.hash.clone(),
60            number: block.number,
61            parent_hash: block.parent_hash.clone(),
62            revert,
63            timestamp: block.ts.timestamp() as u64,
64        }
65    }
66}
67
68impl HeaderLike for BlockHeader {
69    fn block(self) -> Option<BlockHeader> {
70        Some(self)
71    }
72
73    fn block_number_or_timestamp(self) -> u64 {
74        self.number
75    }
76}
77
78#[derive(Error, Debug)]
79pub enum BlockSynchronizerError {
80    #[error("Failed to initialize synchronizer: {0}")]
81    InitializationError(#[from] SynchronizerError),
82
83    #[error("Failed to process new block: {0}")]
84    BlockHistoryError(#[from] BlockHistoryError),
85
86    #[error("Not a single synchronizer was ready")]
87    NoReadySynchronizers,
88
89    #[error("No synchronizers were set")]
90    NoSynchronizers,
91
92    #[error("Failed to convert duration: {0}")]
93    DurationConversionError(String),
94}
95
96type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
97
98/// Aligns multiple StateSynchronizers on the block dimension.
99///
100/// ## Purpose
101/// The purpose of this component is to handle streams from multiple state synchronizers and
102/// align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way,
103/// meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or
104/// unresponsive state synchronizer might recover again, or an advanced state synchronizer can be
105/// included again once we reach the block it is at.
106///
107/// ## Limitations
108/// - Supports only chains with fixed blocks time for now due to the lock step mechanism.
109///
110/// ## Initialisation
111/// Queries all registered synchronizers for their first message and evaluates the state of each
112/// synchronizer. If a synchronizer's first message is an older block, it is marked as delayed.
113// TODO: what is the startup timeout
114/// If no message is received within the startup timeout, the synchronizer is marked as stale and is
115/// closed.
116///
117/// ## Main loop
118/// Once started, the synchronizers are queried concurrently for messages in lock step:
119/// the main loop queries all synchronizers in ready for the last emitted data, builds the
120/// `FeedMessage` and emits it, then it schedules the wait procedure for the next block.
121///
122/// ## Synchronization Logic
123///
124/// To classify a synchronizer as delayed, we need to first define the current block. The highest
125/// block number of all ready synchronizers is considered the current block.
126///
127/// Once we have the current block we can easily determine which block we expect next. And if a
128/// synchronizer delivers an older block we can classify it as delayed.
129///
130/// If any synchronizer is not in the ready state we will try to bring it back to the ready state.
131/// This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach
132/// the height of an advanced synchronizer (and flagging it as such in the meantime).
133///
134/// Of course, we can't wait forever for a synchronizer to reply/recover. All of this must happen
135/// within the block production step of the blockchain:
136/// The wait procedure consists of waiting for any of the individual ProtocolStateSynchronizers
137/// to emit a new message (within a max timeout - several multiples of the block time). Once a
138/// message is received a very short timeout starts for the remaining synchronizers, to deliver a
139/// message. Any synchronizer failing to do so is transitioned to delayed.
140///
141/// ### Note
142/// The described process above is the goal. It is currently not implemented like that. Instead we
143/// simply wait `block_time` + `wait_time`. Synchronizers are expected to respond within that
144/// timeout. This is simpler but only works well on chains with fixed block times.
145pub struct BlockSynchronizer<S> {
146    synchronizers: Option<HashMap<ExtractorIdentity, S>>,
147    block_time: std::time::Duration,
148    max_wait: std::time::Duration,
149    max_messages: Option<usize>,
150    max_missed_blocks: u64,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154#[serde(tag = "status", rename_all = "lowercase")]
155pub enum SynchronizerState {
156    Started,
157    Ready(BlockHeader),
158    // no progress, we consider it stale at that point we should purge it.
159    Stale(BlockHeader),
160    Delayed(BlockHeader),
161    // For this to happen we must have a gap, and a gap usually means a new snapshot from the
162    // StateSynchronizer. This can only happen if we are processing too slow and one of the
163    // synchronizers restarts e.g. because Tycho ended the subscription.
164    Advanced(BlockHeader),
165    Ended,
166}
167
168pub struct SynchronizerStream {
169    extractor_id: ExtractorIdentity,
170    state: SynchronizerState,
171    modify_ts: NaiveDateTime,
172    rx: Receiver<StateSyncMessage<BlockHeader>>,
173}
174
175impl SynchronizerStream {
176    async fn try_advance(
177        &mut self,
178        block_history: &BlockHistory,
179        max_wait: std::time::Duration,
180        stale_threshold: std::time::Duration,
181    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
182        let extractor_id = self.extractor_id.clone();
183        let latest_block = block_history.latest();
184        match &self.state {
185            SynchronizerState::Started | SynchronizerState::Ended => {
186                warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
187                Ok(None)
188            }
189            SynchronizerState::Advanced(b) => {
190                let future_block = b.clone();
191                // Transition to ready once we arrived at the expected height
192                self.transition(future_block, block_history, stale_threshold)?;
193                Ok(None)
194            }
195            SynchronizerState::Ready(previous_block) => {
196                // Try to recv the next expected block, update state accordingly.
197                self.try_recv_next_expected(
198                    max_wait,
199                    block_history,
200                    previous_block.clone(),
201                    stale_threshold,
202                )
203                .await
204                // TODO: if we entered advanced state we need to buffer the message for a while.
205            }
206            SynchronizerState::Delayed(old_block) | SynchronizerState::Stale(old_block) => {
207                // try to catch up all currently queued blocks until the expected block
208                debug!(
209                    ?old_block,
210                    ?latest_block,
211                    %extractor_id,
212                    "Trying to catch up to latest block"
213                );
214                self.try_catch_up(block_history, max_wait, stale_threshold)
215                    .await
216            }
217        }
218    }
219
220    /// Standard way to advance a well-behaved state synchronizer.
221    ///
222    /// Will wait for a new block on the synchronizer within a timeout. And modify its state based
223    /// on the outcome.
224    async fn try_recv_next_expected(
225        &mut self,
226        max_wait: std::time::Duration,
227        block_history: &BlockHistory,
228        previous_block: BlockHeader,
229        stale_threshold: std::time::Duration,
230    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
231        let extractor_id = self.extractor_id.clone();
232        match timeout(max_wait, self.rx.recv()).await {
233            Ok(Some(msg)) => {
234                self.transition(msg.header.clone(), block_history, stale_threshold)?;
235                Ok(Some(msg))
236            }
237            Ok(None) => {
238                error!(
239                    %extractor_id,
240                    ?previous_block,
241                    "SynchronizerStream terminated: channel closed!"
242                );
243                self.state = SynchronizerState::Ended;
244                self.modify_ts = Local::now().naive_utc();
245                Ok(None)
246            }
247            Err(_) => {
248                // trying to advance a block timed out
249                debug!(%extractor_id, ?previous_block, "No block received within time limit.");
250
251                match &self.state {
252                    SynchronizerState::Ready(_) => {
253                        // First timeout: always transition to Delayed
254                        self.state = SynchronizerState::Delayed(previous_block.clone());
255                        self.modify_ts = Local::now().naive_utc();
256                    }
257                    SynchronizerState::Delayed(_) => {
258                        // Already delayed, check if we should go stale
259                        // DON'T update modify_ts here - we want to track time since first delay
260                        self.check_and_transition_to_stale_if_needed(
261                            stale_threshold,
262                            Some(previous_block.clone()),
263                        )?;
264                    }
265                    _ => {
266                        // For other states, use the stale check
267                        if !self.check_and_transition_to_stale_if_needed(
268                            stale_threshold,
269                            Some(previous_block.clone()),
270                        )? {
271                            self.state = SynchronizerState::Delayed(previous_block.clone());
272                            self.modify_ts = Local::now().naive_utc();
273                        }
274                    }
275                }
276                Ok(None)
277            }
278        }
279    }
280
281    /// Tries to catch up a delayed state synchronizer.
282    ///
283    /// If a synchronizer is delayed, this method will try to catch up to the next expected block
284    /// by consuming all waiting messages in its queue and waiting for any new block messages
285    /// within a timeout. Finally, all update messages are merged into one and returned.
286    async fn try_catch_up(
287        &mut self,
288        block_history: &BlockHistory,
289        max_wait: std::time::Duration,
290        stale_threshold: std::time::Duration,
291    ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
292        let mut results = Vec::new();
293        let extractor_id = self.extractor_id.clone();
294
295        // Set a deadline for the overall catch-up operation
296        let deadline = std::time::Instant::now() + max_wait;
297
298        while std::time::Instant::now() < deadline {
299            match timeout(
300                deadline.saturating_duration_since(std::time::Instant::now()),
301                self.rx.recv(),
302            )
303            .await
304            {
305                Ok(Some(msg)) => {
306                    debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
307                    let block_pos = block_history.determine_block_position(&msg.header)?;
308                    results.push(msg);
309                    if matches!(block_pos, BlockPosition::NextExpected) {
310                        break;
311                    }
312                }
313                Ok(None) => {
314                    warn!(%extractor_id, "Channel closed during catch-up");
315                    self.state = SynchronizerState::Ended;
316                    return Ok(None);
317                }
318                Err(_) => {
319                    debug!(%extractor_id, "Timed out waiting for catch-up");
320                    break;
321                }
322            }
323        }
324
325        let merged = results
326            .into_iter()
327            .reduce(|l, r| l.merge(r));
328
329        if let Some(msg) = merged {
330            // we were able to get at least one block out
331            debug!(?extractor_id, "Delayed extractor made progress!");
332            self.transition(msg.header.clone(), block_history, stale_threshold)?;
333            Ok(Some(msg))
334        } else {
335            // No progress made during catch-up, check if we should go stale
336            self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
337            Ok(None)
338        }
339    }
340
341    /// Helper method to check if synchronizer should transition to stale based on time elapsed
342    fn check_and_transition_to_stale_if_needed(
343        &mut self,
344        stale_threshold: std::time::Duration,
345        fallback_header: Option<BlockHeader>,
346    ) -> Result<bool, BlockSynchronizerError> {
347        let now = Local::now().naive_utc();
348        let wait_duration = now.signed_duration_since(self.modify_ts);
349        let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
350            .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
351
352        if wait_duration > stale_threshold_chrono {
353            let header_to_use = match (&self.state, fallback_header) {
354                (SynchronizerState::Ready(h), _) |
355                (SynchronizerState::Delayed(h), _) |
356                (SynchronizerState::Stale(h), _) => h.clone(),
357                (_, Some(h)) => h,
358                _ => BlockHeader::default(),
359            };
360
361            warn!(
362                extractor_id=%self.extractor_id,
363                last_message_at=?self.modify_ts,
364                "SynchronizerStream transition to stale due to timeout."
365            );
366            self.state = SynchronizerState::Stale(header_to_use);
367            self.modify_ts = now;
368            Ok(true)
369        } else {
370            Ok(false)
371        }
372    }
373
374    /// Logic to transition a state synchronizer based on newly received block
375    ///
376    /// Updates the synchronizer's state according to the position of the received block:
377    /// - Next expected block -> Ready state
378    /// - Latest/Delayed block -> Either Delayed or Stale (if >60s since last update)
379    /// - Advanced block -> Advanced state (block ahead of expected position)
380    fn transition(
381        &mut self,
382        latest_retrieved: BlockHeader,
383        block_history: &BlockHistory,
384        stale_threshold: std::time::Duration,
385    ) -> Result<(), BlockSynchronizerError> {
386        let extractor_id = self.extractor_id.clone();
387        let last_message_at = self.modify_ts;
388        let block = &latest_retrieved;
389
390        match block_history.determine_block_position(&latest_retrieved)? {
391            BlockPosition::NextExpected => {
392                self.state = SynchronizerState::Ready(latest_retrieved.clone());
393                trace!(
394                    next = ?latest_retrieved,
395                    extractor = ?extractor_id,
396                    "SynchronizerStream transition to next expected"
397                )
398            }
399            BlockPosition::Latest | BlockPosition::Delayed => {
400                if !self.check_and_transition_to_stale_if_needed(
401                    stale_threshold,
402                    Some(latest_retrieved.clone()),
403                )? {
404                    warn!(
405                        ?extractor_id,
406                        ?last_message_at,
407                        ?block,
408                        "SynchronizerStream transition transition to delayed."
409                    );
410                    self.state = SynchronizerState::Delayed(latest_retrieved.clone());
411                }
412            }
413            BlockPosition::Advanced => {
414                error!(
415                    ?extractor_id,
416                    ?last_message_at,
417                    latest = ?block_history.latest(),
418                    ?block,
419                    "SynchronizerStream transition to advanced."
420                );
421                self.state = SynchronizerState::Advanced(latest_retrieved.clone());
422            }
423        }
424        self.modify_ts = Local::now().naive_utc();
425        Ok(())
426    }
427}
428
429#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
430pub struct FeedMessage<H = BlockHeader>
431where
432    H: HeaderLike,
433{
434    pub state_msgs: HashMap<String, StateSyncMessage<H>>,
435    pub sync_states: HashMap<String, SynchronizerState>,
436}
437
438impl<H> FeedMessage<H>
439where
440    H: HeaderLike,
441{
442    fn new(
443        state_msgs: HashMap<String, StateSyncMessage<H>>,
444        sync_states: HashMap<String, SynchronizerState>,
445    ) -> Self {
446        Self { state_msgs, sync_states }
447    }
448}
449
450impl<S> BlockSynchronizer<S>
451where
452    S: StateSynchronizer,
453{
454    pub fn new(
455        block_time: std::time::Duration,
456        max_wait: std::time::Duration,
457        max_missed_blocks: u64,
458    ) -> Self {
459        Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
460    }
461
462    pub fn max_messages(&mut self, val: usize) {
463        self.max_messages = Some(val);
464    }
465
466    pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
467        let mut registered = self.synchronizers.unwrap_or_default();
468        registered.insert(id, synchronizer);
469        self.synchronizers = Some(registered);
470        self
471    }
472
473    #[cfg(test)]
474    pub fn with_short_timeouts() -> Self {
475        Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
476    }
477
478    /// Cleanup function for shutting down remaining synchronizers when the nanny detects an error.
479    /// Sends close signals to all remaining synchronizers and waits for them to complete.
480    async fn cleanup_synchronizers(
481        mut state_sync_tasks: FuturesUnordered<JoinHandle<SyncResult<()>>>,
482        sync_close_senders: Vec<oneshot::Sender<()>>,
483    ) {
484        // Send close signals to all remaining synchronizers
485        for close_sender in sync_close_senders {
486            let _ = close_sender.send(());
487        }
488
489        // Await remaining tasks with timeout
490        let mut completed_tasks = 0;
491        while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
492            completed_tasks += 1;
493        }
494
495        // Warn if any synchronizers timed out during cleanup
496        let remaining_tasks = state_sync_tasks.len();
497        if remaining_tasks > 0 {
498            warn!(
499                completed = completed_tasks,
500                timed_out = remaining_tasks,
501                "Some synchronizers timed out during cleanup and may not have shut down cleanly"
502            );
503        }
504    }
505
506    pub async fn run(
507        mut self,
508    ) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>)> {
509        trace!("Starting BlockSynchronizer...");
510        let mut state_sync_tasks = FuturesUnordered::new();
511        let mut synchronizers = self
512            .synchronizers
513            .take()
514            .ok_or(BlockSynchronizerError::NoSynchronizers)?;
515        // init synchronizers
516        let init_tasks = synchronizers
517            .values_mut()
518            .map(|s| s.initialize())
519            .collect::<Vec<_>>();
520        try_join_all(init_tasks).await?;
521
522        let mut sync_streams = HashMap::with_capacity(synchronizers.len());
523        let mut sync_close_senders = Vec::new();
524        for (extractor_id, synchronizer) in synchronizers.drain() {
525            let (handle, rx) = synchronizer.start().await?;
526            let (join_handle, close_sender) = handle.split();
527            state_sync_tasks.push(join_handle);
528            sync_close_senders.push(close_sender);
529
530            sync_streams.insert(
531                extractor_id.clone(),
532                SynchronizerStream {
533                    extractor_id,
534                    state: SynchronizerState::Started,
535                    modify_ts: Local::now().naive_utc(),
536                    rx,
537                },
538            );
539        }
540
541        // startup, schedule first set of futures and wait for them to return to initialise
542        // synchronizers.
543        debug!("Waiting for initial synchronizer messages...");
544        let mut startup_futures = Vec::new();
545        for (id, sh) in sync_streams.iter_mut() {
546            let fut = async {
547                let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
548                (id.clone(), res)
549            };
550            startup_futures.push(fut);
551        }
552        let mut ready_sync_msgs = HashMap::new();
553        let initial_headers = join_all(startup_futures)
554            .await
555            .into_iter()
556            .filter_map(|(extractor_id, res)| {
557                let synchronizer = sync_streams
558                .get_mut(&extractor_id)
559                .unwrap();
560            match res {
561                Ok(Some(msg)) => {
562                    debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
563                    // initially default all synchronizers to Ready
564                    synchronizer.state = SynchronizerState::Ready(msg.header.clone());
565                    synchronizer.modify_ts = Local::now().naive_utc();
566                    ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
567                    Some(msg.header)
568                }
569                Ok(None) => {
570                    warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
571                    synchronizer.state = SynchronizerState::Ended;
572                    synchronizer.modify_ts = Local::now().naive_utc();
573                    None
574                }
575                Err(_) => {
576                    warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
577                    synchronizer.state = SynchronizerState::Stale(BlockHeader::default());
578                    synchronizer.modify_ts = Local::now().naive_utc();
579                    None
580                }
581            }
582        })
583        .collect::<HashSet<_>>() // remove duplicates
584        .into_iter()
585        .collect::<Vec<_>>();
586
587        let mut block_history = BlockHistory::new(initial_headers, 15)?;
588
589        // Determine the starting header for synchronization
590        let start_header = block_history
591            .latest()
592            .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
593        info!(
594            start_block=?start_header,
595            n_healthy=?ready_sync_msgs.len(),
596            "Block synchronization started successfully!"
597        );
598
599        // Purge any stale synchronizers
600        // All synchronizers that did not timeout on start up are initialized as Ready, including
601        // those that are Delayed. Delayed synchronizers are identified and updated accordingly in
602        // the next step.
603        sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
604
605        // Determine correct state for each remaining synchronizer, based on their header vs the
606        // latest one
607        for (_, stream) in sync_streams.iter_mut() {
608            if let SynchronizerState::Ready(header) = &stream.state.clone() {
609                if header.number < start_header.number {
610                    stream.state = SynchronizerState::Delayed(header.clone());
611                    debug!(
612                        extractor_id=%stream.extractor_id,
613                        synchronizer_block=?header.number,
614                        current_block=?start_header.number,
615                        "Marking synchronizer as delayed during initialization"
616                    );
617                }
618            }
619        }
620
621        let (sync_tx, sync_rx) = mpsc::channel(30);
622        let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
623            let mut n_iter = 1;
624            loop {
625                // Send retrieved data to receivers.
626                sync_tx
627                    .send(FeedMessage::new(
628                        std::mem::take(&mut ready_sync_msgs),
629                        sync_streams
630                            .iter()
631                            .map(|(a, b)| (a.name.to_string(), b.state.clone()))
632                            .collect(),
633                    ))
634                    .await?;
635
636                // Check if we have reached the max messages
637                if let Some(max_messages) = self.max_messages {
638                    if n_iter >= max_messages {
639                        info!(max_messages, "StreamEnd");
640                        return Ok(());
641                    }
642                }
643                n_iter += 1;
644
645                // Here we simply wait block_time + max_wait. This will not work for chains with
646                // unknown block times but is simple enough for now.
647                // If we would like to support unknown block times we could: Instruct all handles to
648                // await the max block time, if a header arrives within that time transition as
649                // usual, but via a select statement get notified (using e.g. Notify) if any other
650                // handle finishes before the timeout. Then await again but this time only for
651                // max_wait and then proceed as usual. So basically each try_advance task would have
652                // a select statement that allows it to exit the first timeout preemptively if any
653                // other try_advance task finished earlier.
654                let mut recv_futures = Vec::new();
655                for (extractor_id, sh) in sync_streams.iter_mut() {
656                    recv_futures.push(async {
657                        let res = sh
658                            .try_advance(
659                                &block_history,
660                                self.block_time + self.max_wait,
661                                self.block_time
662                                    .mul_f64(self.max_missed_blocks as f64),
663                            )
664                            .await?;
665                        Ok::<_, BlockSynchronizerError>(
666                            res.map(|msg| (extractor_id.name.clone(), msg)),
667                        )
668                    });
669                }
670                ready_sync_msgs.extend(
671                    join_all(recv_futures)
672                        .await
673                        .into_iter()
674                        .collect::<Result<Vec<_>, _>>()?
675                        .into_iter()
676                        .flatten(),
677                );
678
679                // Purge any bad synchronizers, respective warnings have already been issued at
680                // transition time.
681                sync_streams.retain(|_, v| match v.state {
682                    SynchronizerState::Started | SynchronizerState::Ended => false,
683                    SynchronizerState::Stale(_) => false,
684                    SynchronizerState::Ready(_) => true,
685                    SynchronizerState::Delayed(_) => true,
686                    SynchronizerState::Advanced(_) => true,
687                });
688
689                // Check if we have any active synchronizers (Ready, Delayed, or Advanced)
690                // If all synchronizers have been purged (Stale/Ended), exit the main loop
691                if sync_streams.is_empty() {
692                    error!("No healthy SynchronizerStream remain");
693                    return Err(BlockSynchronizerError::NoReadySynchronizers.into());
694                }
695
696                // Find the latest connected block header to advance history
697                if let Some(header) = sync_streams
698                    .values()
699                    .filter_map(|v| match &v.state {
700                        SynchronizerState::Ready(b) | SynchronizerState::Delayed(b) => Some(b),
701                        _ => None,
702                    })
703                    .max_by_key(|b| b.number)
704                {
705                    block_history.push(header.clone())?;
706                } else {
707                    // No Ready or Delayed synchronizers, but we still have some synchronizers
708                    // we can probably recover here but since this is unlikely we error for now
709                    error!("Only advanced SynchronizerStreams remain");
710                    return Err(BlockSynchronizerError::NoReadySynchronizers.into());
711                }
712            }
713        });
714
715        let nanny_jh = tokio::spawn(async move {
716            select! {
717                error = state_sync_tasks.select_next_some() => {
718                    Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
719                    error!(?error, "State synchronizer exited");
720                },
721                error = main_loop_jh => {
722                    Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
723                    error!(?error, "Feed main loop exited");
724                }
725            }
726        });
727        Ok((nanny_jh, sync_rx))
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use std::sync::Arc;
734
735    use async_trait::async_trait;
736    use test_log::test;
737    use tokio::sync::{oneshot, Mutex};
738    use tycho_common::dto::Chain;
739
740    use super::*;
741    use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
742
743    #[derive(Clone, Debug)]
744    enum MockBehavior {
745        Normal,          // Exit successfully when receiving close signal
746        FailOnExit,      // Exit with error when receiving close signal
747        IgnoreClose,     // Ignore close signals and hang (for timeout testing)
748        ExitImmediately, // Exit immediately after first message (for quick failure testing)
749    }
750
751    #[derive(Clone)]
752    struct MockStateSync {
753        header_tx: mpsc::Sender<StateSyncMessage<BlockHeader>>,
754        header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage<BlockHeader>>>>>,
755        close_received: Arc<Mutex<bool>>,
756        behavior: MockBehavior,
757        // For testing: store the close sender so tests can trigger close signals
758        close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
759    }
760
761    impl MockStateSync {
762        fn new() -> Self {
763            Self::with_behavior(MockBehavior::Normal)
764        }
765
766        fn with_behavior(behavior: MockBehavior) -> Self {
767            let (tx, rx) = mpsc::channel(1);
768            Self {
769                header_tx: tx,
770                header_rx: Arc::new(Mutex::new(Some(rx))),
771                close_received: Arc::new(Mutex::new(false)),
772                behavior,
773                close_tx: Arc::new(Mutex::new(None)),
774            }
775        }
776
777        async fn was_close_received(&self) -> bool {
778            *self.close_received.lock().await
779        }
780
781        async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
782            self.header_tx
783                .send(header)
784                .await
785                .map_err(|e| format!("sending header failed: {e}"))
786        }
787
788        // For testing: trigger a close signal to make the synchronizer exit
789        async fn trigger_close(&self) {
790            if let Some(close_tx) = self.close_tx.lock().await.take() {
791                let _ = close_tx.send(());
792            }
793        }
794    }
795
796    #[async_trait]
797    impl StateSynchronizer for MockStateSync {
798        async fn initialize(&mut self) -> SyncResult<()> {
799            Ok(())
800        }
801
802        async fn start(
803            mut self,
804        ) -> SyncResult<(SynchronizerTaskHandle, Receiver<StateSyncMessage<BlockHeader>>)> {
805            let block_rx = {
806                let mut guard = self.header_rx.lock().await;
807                guard
808                    .take()
809                    .expect("Block receiver was not set!")
810            };
811
812            // Create close channel - we need to store one sender for testing and give one to the
813            // handle
814            let (close_tx_for_handle, close_rx) = oneshot::channel();
815            let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
816
817            // Store the test close sender
818            {
819                let mut guard = self.close_tx.lock().await;
820                *guard = Some(close_tx_for_test);
821            }
822
823            let close_received_clone = self.close_received.clone();
824            let behavior = self.behavior.clone();
825
826            let jh = tokio::spawn(async move {
827                match behavior {
828                    MockBehavior::IgnoreClose => {
829                        // Infinite loop to simulate a hung synchronizer that doesn't respond to
830                        // close signals
831                        loop {
832                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
833                        }
834                    }
835                    MockBehavior::ExitImmediately => {
836                        // Exit immediately with error to simulate immediate task failure
837                        SyncResult::Err(SynchronizerError::ConnectionError(
838                            "Simulated immediate task failure".to_string(),
839                        ))
840                    }
841                    MockBehavior::Normal | MockBehavior::FailOnExit => {
842                        // Wait for close signal from either handle or test, then respond based on
843                        // behavior
844                        let result = tokio::select! {
845                            result = close_rx => result,
846                            result = close_rx_for_test => result,
847                        };
848
849                        match result {
850                            Ok(()) => {
851                                // Mark that close signal was received
852                                let mut guard = close_received_clone.lock().await;
853                                *guard = true;
854
855                                match behavior {
856                                    MockBehavior::Normal => SyncResult::Ok(()),
857                                    MockBehavior::FailOnExit => {
858                                        SyncResult::Err(SynchronizerError::ConnectionError(
859                                            "Simulated task failure on close".to_string(),
860                                        ))
861                                    }
862                                    _ => unreachable!(),
863                                }
864                            }
865                            Err(_) => {
866                                // Close signal sender was dropped
867                                match behavior {
868                                    MockBehavior::Normal => SyncResult::Ok(()),
869                                    MockBehavior::FailOnExit => {
870                                        SyncResult::Err(SynchronizerError::ConnectionError(
871                                            "Simulated task failure on close sender drop"
872                                                .to_string(),
873                                        ))
874                                    }
875                                    _ => unreachable!(),
876                                }
877                            }
878                        }
879                    }
880                }
881            });
882
883            let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
884            Ok((handle, block_rx))
885        }
886    }
887
888    #[test(tokio::test)]
889    async fn test_two_ready_synchronizers() {
890        let v2_sync = MockStateSync::new();
891        let v3_sync = MockStateSync::new();
892        let block_sync = BlockSynchronizer::with_short_timeouts()
893            .register_synchronizer(
894                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
895                v2_sync.clone(),
896            )
897            .register_synchronizer(
898                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
899                v3_sync.clone(),
900            );
901        let start_msg = StateSyncMessage {
902            header: BlockHeader { number: 1, ..Default::default() },
903            ..Default::default()
904        };
905        v2_sync
906            .send_header(start_msg.clone())
907            .await
908            .expect("send_header failed");
909        v3_sync
910            .send_header(start_msg.clone())
911            .await
912            .expect("send_header failed");
913
914        let (_jh, mut rx) = block_sync
915            .run()
916            .await
917            .expect("BlockSynchronizer failed to start.");
918        let first_feed_msg = rx
919            .recv()
920            .await
921            .expect("header channel was closed");
922        let second_msg = StateSyncMessage {
923            header: BlockHeader { number: 2, ..Default::default() },
924            ..Default::default()
925        };
926        v2_sync
927            .send_header(second_msg.clone())
928            .await
929            .expect("send_header failed");
930        v3_sync
931            .send_header(second_msg.clone())
932            .await
933            .expect("send_header failed");
934        let second_feed_msg = rx
935            .recv()
936            .await
937            .expect("header channel was closed!");
938
939        let exp1 = FeedMessage {
940            state_msgs: [
941                ("uniswap-v2".to_string(), start_msg.clone()),
942                ("uniswap-v3".to_string(), start_msg.clone()),
943            ]
944            .into_iter()
945            .collect(),
946            sync_states: [
947                ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
948                ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
949            ]
950            .into_iter()
951            .collect(),
952        };
953        let exp2 = FeedMessage {
954            state_msgs: [
955                ("uniswap-v2".to_string(), second_msg.clone()),
956                ("uniswap-v3".to_string(), second_msg.clone()),
957            ]
958            .into_iter()
959            .collect(),
960            sync_states: [
961                ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
962                ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
963            ]
964            .into_iter()
965            .collect(),
966        };
967        assert_eq!(first_feed_msg, exp1);
968        assert_eq!(second_feed_msg, exp2);
969    }
970
971    #[test(tokio::test)]
972    async fn test_delayed_synchronizer_catches_up() {
973        let v2_sync = MockStateSync::new();
974        let v3_sync = MockStateSync::new();
975        let block_sync = BlockSynchronizer::with_short_timeouts()
976            .register_synchronizer(
977                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
978                v2_sync.clone(),
979            )
980            .register_synchronizer(
981                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
982                v3_sync.clone(),
983            );
984
985        // Initial messages - both synchronizers are at block 1
986        let block1_msg = StateSyncMessage {
987            header: BlockHeader {
988                number: 1,
989                hash: Bytes::from(vec![1]),
990                parent_hash: Bytes::from(vec![0]),
991                revert: false,
992                ..Default::default()
993            },
994            ..Default::default()
995        };
996        v2_sync
997            .send_header(block1_msg.clone())
998            .await
999            .expect("send_header failed");
1000        v3_sync
1001            .send_header(block1_msg.clone())
1002            .await
1003            .expect("send_header failed");
1004
1005        // Start the block synchronizer
1006        let (_jh, mut rx) = block_sync
1007            .run()
1008            .await
1009            .expect("BlockSynchronizer failed to start.");
1010
1011        // Consume the first message
1012        let first_feed_msg = rx
1013            .recv()
1014            .await
1015            .expect("header channel was closed");
1016        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1017        assert!(matches!(
1018            first_feed_msg
1019                .sync_states
1020                .get("uniswap-v2")
1021                .unwrap(),
1022            SynchronizerState::Ready(_)
1023        ));
1024        assert!(matches!(
1025            first_feed_msg
1026                .sync_states
1027                .get("uniswap-v3")
1028                .unwrap(),
1029            SynchronizerState::Ready(_)
1030        ));
1031
1032        // Send block 2 to v2 synchronizer only
1033        let block2_msg = StateSyncMessage {
1034            header: BlockHeader {
1035                number: 2,
1036                hash: Bytes::from(vec![2]),
1037                parent_hash: Bytes::from(vec![1]),
1038                revert: false,
1039                ..Default::default()
1040            },
1041            ..Default::default()
1042        };
1043        v2_sync
1044            .send_header(block2_msg.clone())
1045            .await
1046            .expect("send_header failed");
1047
1048        // Consume second message - v3 should be delayed
1049        let second_feed_msg = rx
1050            .recv()
1051            .await
1052            .expect("header channel was closed");
1053        debug!("Consumed second message for v2");
1054
1055        assert!(second_feed_msg
1056            .state_msgs
1057            .contains_key("uniswap-v2"));
1058        assert!(matches!(
1059            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1060            SynchronizerState::Ready(header) if header.number == 2
1061        ));
1062        assert!(!second_feed_msg
1063            .state_msgs
1064            .contains_key("uniswap-v3"));
1065        assert!(matches!(
1066            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1067            SynchronizerState::Delayed(header) if header.number == 1
1068        ));
1069
1070        // Now v3 catches up to block 2
1071        v3_sync
1072            .send_header(block2_msg.clone())
1073            .await
1074            .expect("send_header failed");
1075
1076        // Both advance to block 3
1077        let block3_msg = StateSyncMessage {
1078            header: BlockHeader {
1079                number: 3,
1080                hash: Bytes::from(vec![3]),
1081                parent_hash: Bytes::from(vec![2]),
1082                revert: false,
1083                ..Default::default()
1084            },
1085            ..Default::default()
1086        };
1087        v2_sync
1088            .send_header(block3_msg.clone())
1089            .await
1090            .expect("send_header failed");
1091        v3_sync
1092            .send_header(block3_msg)
1093            .await
1094            .expect("send_header failed");
1095
1096        // Consume messages until we get both synchronizers on block 3
1097        // We may get an intermediate message for v3's catch-up or a combined message
1098        let mut third_feed_msg = rx
1099            .recv()
1100            .await
1101            .expect("header channel was closed");
1102
1103        // If this message doesn't have both univ2, it's an intermediate message, so we get the next
1104        // one
1105        if !third_feed_msg
1106            .state_msgs
1107            .contains_key("uniswap-v2")
1108        {
1109            third_feed_msg = rx
1110                .recv()
1111                .await
1112                .expect("header channel was closed");
1113        }
1114        assert!(third_feed_msg
1115            .state_msgs
1116            .contains_key("uniswap-v2"));
1117        assert!(third_feed_msg
1118            .state_msgs
1119            .contains_key("uniswap-v3"));
1120        assert!(matches!(
1121            third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1122            SynchronizerState::Ready(header) if header.number == 3
1123        ));
1124        assert!(matches!(
1125            third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1126            SynchronizerState::Ready(header) if header.number == 3
1127        ));
1128    }
1129
1130    #[test(tokio::test)]
1131    async fn test_different_start_blocks() {
1132        let v2_sync = MockStateSync::new();
1133        let v3_sync = MockStateSync::new();
1134        let block_sync = BlockSynchronizer::with_short_timeouts()
1135            .register_synchronizer(
1136                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1137                v2_sync.clone(),
1138            )
1139            .register_synchronizer(
1140                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1141                v3_sync.clone(),
1142            );
1143
1144        // Initial messages - synchronizers at different blocks
1145        let block1_msg = StateSyncMessage {
1146            header: BlockHeader {
1147                number: 1,
1148                hash: Bytes::from(vec![1]),
1149                parent_hash: Bytes::from(vec![0]),
1150                revert: false,
1151                ..Default::default()
1152            },
1153            ..Default::default()
1154        };
1155        let block2_msg = StateSyncMessage {
1156            header: BlockHeader {
1157                number: 2,
1158                hash: Bytes::from(vec![2]),
1159                parent_hash: Bytes::from(vec![1]),
1160                revert: false,
1161                ..Default::default()
1162            },
1163            ..Default::default()
1164        };
1165
1166        let _ = v2_sync
1167            .send_header(block1_msg.clone())
1168            .await;
1169        v3_sync
1170            .send_header(block2_msg.clone())
1171            .await
1172            .expect("send_header failed");
1173
1174        // Start the block synchronizer - it should use block 2 as the starting block
1175        let (_jh, mut rx) = block_sync
1176            .run()
1177            .await
1178            .expect("BlockSynchronizer failed to start.");
1179
1180        // Consume first message
1181        let first_feed_msg = rx
1182            .recv()
1183            .await
1184            .expect("header channel was closed");
1185        assert!(matches!(
1186            first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1187            SynchronizerState::Delayed(header) if header.number == 1
1188        ));
1189        assert!(matches!(
1190            first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1191            SynchronizerState::Ready(header) if header.number == 2
1192        ));
1193
1194        // Now v2 catches up to block 2
1195        v2_sync
1196            .send_header(block2_msg.clone())
1197            .await
1198            .expect("send_header failed");
1199
1200        // Both advance to block 3
1201        let block3_msg = StateSyncMessage {
1202            header: BlockHeader {
1203                number: 3,
1204                hash: Bytes::from(vec![3]),
1205                parent_hash: Bytes::from(vec![2]),
1206                revert: false,
1207                ..Default::default()
1208            },
1209            ..Default::default()
1210        };
1211        let _ = v2_sync
1212            .send_header(block3_msg.clone())
1213            .await;
1214        v3_sync
1215            .send_header(block3_msg.clone())
1216            .await
1217            .expect("send_header failed");
1218
1219        // Consume third message - both should be on block 3
1220        let second_feed_msg = rx
1221            .recv()
1222            .await
1223            .expect("header channel was closed");
1224        assert_eq!(second_feed_msg.state_msgs.len(), 2);
1225        assert!(matches!(
1226            second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1227            SynchronizerState::Ready(header) if header.number == 3
1228        ));
1229        assert!(matches!(
1230            second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1231            SynchronizerState::Ready(header) if header.number == 3
1232        ));
1233    }
1234
1235    #[test(tokio::test)]
1236    async fn test_synchronizer_task_failure_triggers_cleanup() {
1237        // Test Case 1: Verify that when a synchronizer task fails,
1238        // the nanny properly cleans up all other synchronizers
1239
1240        let v2_sync = MockStateSync::with_behavior(MockBehavior::ExitImmediately);
1241        let v3_sync = MockStateSync::new(); // Normal behavior
1242
1243        let block_sync = BlockSynchronizer::with_short_timeouts()
1244            .register_synchronizer(
1245                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1246                v2_sync.clone(),
1247            )
1248            .register_synchronizer(
1249                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1250                v3_sync.clone(),
1251            );
1252
1253        // Send initial messages
1254        let start_msg = StateSyncMessage {
1255            header: BlockHeader { number: 1, ..Default::default() },
1256            ..Default::default()
1257        };
1258        v2_sync
1259            .send_header(start_msg.clone())
1260            .await
1261            .expect("send_header failed");
1262        v3_sync
1263            .send_header(start_msg.clone())
1264            .await
1265            .expect("send_header failed");
1266
1267        // Start BlockSynchronizer - v2_sync will exit immediately with error
1268        let (nanny_handle, mut sync_rx) = block_sync
1269            .run()
1270            .await
1271            .expect("BlockSynchronizer failed to start");
1272
1273        // Consume first message to ensure at least one synchronizer is running
1274        let first_msg = sync_rx
1275            .recv()
1276            .await
1277            .expect("Should receive first message");
1278        // v2_sync might have already failed, so we might only get v3_sync message
1279        assert!(!first_msg.state_msgs.is_empty());
1280
1281        // Wait for nanny to detect task failure and execute cleanup
1282        let result = timeout(Duration::from_secs(2), nanny_handle).await;
1283        assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1284
1285        // Give cleanup time to execute
1286        tokio::time::sleep(Duration::from_millis(50)).await;
1287
1288        // Verify that the remaining synchronizer received close signal during cleanup
1289        assert!(
1290            v3_sync.was_close_received().await,
1291            "v3_sync should have received close signal during cleanup"
1292        );
1293    }
1294
1295    #[test(tokio::test)]
1296    async fn test_synchronizer_task_exit_triggers_cleanup() {
1297        // Test Case 2: StateSynchronizer task exits with error on close, triggering nanny cleanup
1298        // This tests the first branch of the nanny's select statement
1299
1300        let v2_sync = MockStateSync::with_behavior(MockBehavior::FailOnExit);
1301        let v3_sync = MockStateSync::new();
1302
1303        let block_sync = BlockSynchronizer::with_short_timeouts()
1304            .register_synchronizer(
1305                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1306                v2_sync.clone(),
1307            )
1308            .register_synchronizer(
1309                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1310                v3_sync.clone(),
1311            );
1312
1313        // Send initial messages
1314        let start_msg = StateSyncMessage {
1315            header: BlockHeader { number: 1, ..Default::default() },
1316            ..Default::default()
1317        };
1318        v2_sync
1319            .send_header(start_msg.clone())
1320            .await
1321            .expect("send_header failed");
1322        v3_sync
1323            .send_header(start_msg.clone())
1324            .await
1325            .expect("send_header failed");
1326
1327        // Start BlockSynchronizer
1328        let (nanny_handle, mut sync_rx) = block_sync
1329            .run()
1330            .await
1331            .expect("BlockSynchronizer failed to start");
1332
1333        // Consume first message
1334        let first_msg = sync_rx
1335            .recv()
1336            .await
1337            .expect("Should receive first message");
1338        assert_eq!(first_msg.state_msgs.len(), 2);
1339
1340        // Send a close signal to v2_sync to make it exit with error (due to FailOnExit behavior)
1341        // This should trigger the first branch of the nanny's select statement
1342        v2_sync.trigger_close().await;
1343
1344        // Wait for nanny to detect synchronizer task exit and complete cleanup
1345        let result = timeout(Duration::from_secs(2), nanny_handle).await;
1346        assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1347
1348        // Give cleanup time to execute
1349        tokio::time::sleep(Duration::from_millis(50)).await;
1350
1351        // Verify cleanup was triggered - v3_sync should have received close signal
1352        assert!(
1353            v3_sync.was_close_received().await,
1354            "v3_sync should have received close signal during cleanup"
1355        );
1356    }
1357
1358    #[test(tokio::test)]
1359    async fn test_main_loop_timeout_triggers_cleanup() {
1360        // Test Case 3: Main loop times out waiting for synchronizers
1361        // This simulates synchronizers becoming unresponsive
1362
1363        let v2_sync = MockStateSync::new();
1364        let v3_sync = MockStateSync::new();
1365
1366        let block_sync = BlockSynchronizer::with_short_timeouts()
1367            .register_synchronizer(
1368                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1369                v2_sync.clone(),
1370            )
1371            .register_synchronizer(
1372                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1373                v3_sync.clone(),
1374            );
1375
1376        // Send initial messages
1377        let start_msg = StateSyncMessage {
1378            header: BlockHeader { number: 1, ..Default::default() },
1379            ..Default::default()
1380        };
1381        v2_sync
1382            .send_header(start_msg.clone())
1383            .await
1384            .expect("send_header failed");
1385        v3_sync
1386            .send_header(start_msg.clone())
1387            .await
1388            .expect("send_header failed");
1389
1390        // Start BlockSynchronizer
1391        let (nanny_handle, mut sync_rx) = block_sync
1392            .run()
1393            .await
1394            .expect("BlockSynchronizer failed to start");
1395
1396        // Consume first message
1397        let first_msg = sync_rx
1398            .recv()
1399            .await
1400            .expect("Should receive first message");
1401        assert_eq!(first_msg.state_msgs.len(), 2);
1402
1403        // Don't send any more messages - synchronizers will become stale and eventually cause
1404        // main loop to error when no ready synchronizers remain
1405
1406        // Wait for main loop to error due to no ready synchronizers
1407        let result = timeout(Duration::from_secs(3), nanny_handle).await;
1408        assert!(
1409            result.is_ok(),
1410            "Nanny should complete when main loop errors due to no ready synchronizers"
1411        );
1412
1413        // Give cleanup time to execute
1414        tokio::time::sleep(Duration::from_millis(50)).await;
1415
1416        // Verify cleanup was triggered
1417        assert!(
1418            v2_sync.was_close_received().await,
1419            "v2_sync should have received close signal during cleanup"
1420        );
1421        assert!(
1422            v3_sync.was_close_received().await,
1423            "v3_sync should have received close signal during cleanup"
1424        );
1425    }
1426
1427    #[test(tokio::test)]
1428    async fn test_cleanup_timeout_warning() {
1429        // Verify that cleanup_synchronizers emits a warning when synchronizers timeout during
1430        // cleanup
1431
1432        let v2_sync = MockStateSync::with_behavior(MockBehavior::ExitImmediately);
1433        let v3_sync = MockStateSync::with_behavior(MockBehavior::IgnoreClose);
1434
1435        let block_sync = BlockSynchronizer::with_short_timeouts()
1436            .register_synchronizer(
1437                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1438                v2_sync.clone(),
1439            )
1440            .register_synchronizer(
1441                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1442                v3_sync.clone(),
1443            );
1444
1445        // Send initial messages
1446        let start_msg = StateSyncMessage {
1447            header: BlockHeader { number: 1, ..Default::default() },
1448            ..Default::default()
1449        };
1450        v2_sync
1451            .send_header(start_msg.clone())
1452            .await
1453            .expect("send_header failed");
1454        v3_sync
1455            .send_header(start_msg.clone())
1456            .await
1457            .expect("send_header failed");
1458
1459        // Start BlockSynchronizer - v2_sync will exit immediately, triggering cleanup
1460        // v3_sync will ignore close signals and timeout during cleanup
1461        let (nanny_handle, mut sync_rx) = block_sync
1462            .run()
1463            .await
1464            .expect("BlockSynchronizer failed to start");
1465
1466        // Might not get any messages if v2_sync fails before producing output
1467        let _ = sync_rx.recv().await;
1468
1469        // Wait for nanny to complete - cleanup should timeout on v3_sync but still complete
1470        let result = timeout(Duration::from_secs(10), nanny_handle).await;
1471        assert!(
1472            result.is_ok(),
1473            "Nanny should complete even when some synchronizers timeout during cleanup"
1474        );
1475
1476        // Note: In a real test environment, we would capture log output to verify the warning was
1477        // emitted. Since this is a unit test without log capture setup, we just verify that
1478        // cleanup completes even when some synchronizers timeout.
1479    }
1480
1481    #[test(tokio::test)]
1482    async fn test_one_synchronizer_goes_stale_while_other_works() {
1483        // Test Case 1: One protocol goes stale and is removed while another protocol works normally
1484
1485        let v2_sync = MockStateSync::new();
1486        let v3_sync = MockStateSync::new();
1487
1488        // Use reasonable timeouts to observe proper state transitions
1489        let mut block_sync = BlockSynchronizer::new(
1490            Duration::from_millis(20), // block_time
1491            Duration::from_millis(10), // max_wait
1492            2,                         // max_missed_blocks (stale threshold = 20ms * 2 = 40ms)
1493        );
1494        block_sync.max_messages(5); // Limit messages for test
1495
1496        let block_sync = block_sync
1497            .register_synchronizer(
1498                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1499                v2_sync.clone(),
1500            )
1501            .register_synchronizer(
1502                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1503                v3_sync.clone(),
1504            );
1505
1506        // Send initial messages to both synchronizers
1507        let block1_msg = StateSyncMessage {
1508            header: BlockHeader {
1509                number: 1,
1510                hash: Bytes::from(vec![1]),
1511                parent_hash: Bytes::from(vec![0]),
1512                revert: false,
1513                timestamp: 1000,
1514            },
1515            ..Default::default()
1516        };
1517        let _ = v2_sync
1518            .send_header(block1_msg.clone())
1519            .await;
1520        let _ = v3_sync
1521            .send_header(block1_msg.clone())
1522            .await;
1523
1524        // Start the block synchronizer
1525        let (nanny_handle, mut rx) = block_sync
1526            .run()
1527            .await
1528            .expect("BlockSynchronizer failed to start");
1529
1530        // Consume the first message - both should be ready
1531        let first_feed_msg = rx
1532            .recv()
1533            .await
1534            .expect("Should receive first message");
1535        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1536        assert!(matches!(
1537            first_feed_msg
1538                .sync_states
1539                .get("uniswap-v2")
1540                .unwrap(),
1541            SynchronizerState::Ready(_)
1542        ));
1543        assert!(matches!(
1544            first_feed_msg
1545                .sync_states
1546                .get("uniswap-v3")
1547                .unwrap(),
1548            SynchronizerState::Ready(_)
1549        ));
1550
1551        // Send block 2 only to v3, v2 will timeout and become delayed
1552        let block2_msg = StateSyncMessage {
1553            header: BlockHeader {
1554                number: 2,
1555                hash: Bytes::from(vec![2]),
1556                parent_hash: Bytes::from(vec![1]),
1557                revert: false,
1558                timestamp: 2000,
1559            },
1560            ..Default::default()
1561        };
1562        let _ = v3_sync
1563            .send_header(block2_msg.clone())
1564            .await;
1565        // Don't send to v2_sync - it will timeout
1566
1567        // Consume second message - v2 should be delayed, v3 ready
1568        let second_feed_msg = rx
1569            .recv()
1570            .await
1571            .expect("Should receive second message");
1572        assert!(second_feed_msg
1573            .state_msgs
1574            .contains_key("uniswap-v3"));
1575        assert!(!second_feed_msg
1576            .state_msgs
1577            .contains_key("uniswap-v2"));
1578        assert!(matches!(
1579            second_feed_msg
1580                .sync_states
1581                .get("uniswap-v3")
1582                .unwrap(),
1583            SynchronizerState::Ready(_)
1584        ));
1585        // v2 should be delayed (if still present) - check nanny is still running
1586        if let Some(v2_state) = second_feed_msg
1587            .sync_states
1588            .get("uniswap-v2")
1589        {
1590            if matches!(v2_state, SynchronizerState::Delayed(_)) {
1591                // Verify nanny is still running when synchronizer is just delayed
1592                assert!(
1593                    !nanny_handle.is_finished(),
1594                    "Nanny should still be running when synchronizer is delayed (not stale yet)"
1595                );
1596            }
1597        }
1598
1599        // Wait a bit, then continue sending blocks to v3 but not v2
1600        tokio::time::sleep(Duration::from_millis(15)).await;
1601
1602        // Continue sending blocks only to v3 to keep it healthy while v2 goes stale
1603        let block3_msg = StateSyncMessage {
1604            header: BlockHeader {
1605                number: 3,
1606                hash: Bytes::from(vec![3]),
1607                parent_hash: Bytes::from(vec![2]),
1608                revert: false,
1609                timestamp: 3000,
1610            },
1611            ..Default::default()
1612        };
1613        let _ = v3_sync
1614            .send_header(block3_msg.clone())
1615            .await;
1616
1617        // Wait more time for v2 to go stale
1618        tokio::time::sleep(Duration::from_millis(100)).await;
1619
1620        // Consume remaining messages until we see v2 is removed
1621        let mut found_removed = false;
1622
1623        for _ in 0..3 {
1624            if let Some(msg) = rx.recv().await {
1625                if !msg
1626                    .sync_states
1627                    .contains_key("uniswap-v2")
1628                {
1629                    // v2 has been removed from sync_states
1630                    found_removed = true;
1631                }
1632
1633                // v3 should still be working (can be Ready, Delayed, or Advanced)
1634                if let Some(v3_state) = msg.sync_states.get("uniswap-v3") {
1635                    assert!(
1636                        !matches!(v3_state, SynchronizerState::Stale(_) | SynchronizerState::Ended),
1637                        "v3 should not be stale or ended, but was: {v3_state:?}"
1638                    );
1639                }
1640
1641                if found_removed {
1642                    break;
1643                }
1644            } else {
1645                break;
1646            }
1647        }
1648
1649        // v2 should be removed (it may go stale briefly but get purged immediately)
1650        assert!(found_removed, "v2 synchronizer should be removed after going stale");
1651        // Note: found_stale might be false if the stale synchronizer is purged immediately
1652    }
1653
1654    #[test(tokio::test)]
1655    async fn test_all_synchronizers_go_stale_main_loop_exits() {
1656        // Test Case 2: All protocols go stale and main loop exits gracefully
1657
1658        let v2_sync = MockStateSync::new();
1659        let v3_sync = MockStateSync::new();
1660
1661        // Use reasonable timeouts to observe proper state transitions
1662        let mut block_sync = BlockSynchronizer::new(
1663            Duration::from_millis(20), // block_time
1664            Duration::from_millis(10), // max_wait
1665            3,                         // max_missed_blocks (stale threshold = 20ms * 3 = 60ms)
1666        );
1667        block_sync.max_messages(10); // Allow enough messages to see the progression
1668
1669        let block_sync = block_sync
1670            .register_synchronizer(
1671                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1672                v2_sync.clone(),
1673            )
1674            .register_synchronizer(
1675                ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1676                v3_sync.clone(),
1677            );
1678
1679        // Send initial messages to both synchronizers
1680        let block1_msg = StateSyncMessage {
1681            header: BlockHeader {
1682                number: 1,
1683                hash: Bytes::from(vec![1]),
1684                parent_hash: Bytes::from(vec![0]),
1685                revert: false,
1686                timestamp: 1000,
1687            },
1688            ..Default::default()
1689        };
1690        let _ = v2_sync
1691            .send_header(block1_msg.clone())
1692            .await;
1693        let _ = v3_sync
1694            .send_header(block1_msg.clone())
1695            .await;
1696
1697        // Start the block synchronizer
1698        let (nanny_handle, mut rx) = block_sync
1699            .run()
1700            .await
1701            .expect("BlockSynchronizer failed to start");
1702
1703        // Consume the first message - both should be ready
1704        let first_feed_msg = rx
1705            .recv()
1706            .await
1707            .expect("Should receive first message");
1708        assert_eq!(first_feed_msg.state_msgs.len(), 2);
1709        assert!(matches!(
1710            first_feed_msg
1711                .sync_states
1712                .get("uniswap-v2")
1713                .unwrap(),
1714            SynchronizerState::Ready(_)
1715        ));
1716        assert!(matches!(
1717            first_feed_msg
1718                .sync_states
1719                .get("uniswap-v3")
1720                .unwrap(),
1721            SynchronizerState::Ready(_)
1722        ));
1723
1724        // Stop sending messages to both synchronizers - they should both timeout and go stale
1725        // Don't send any more messages, let them timeout and become delayed, then stale
1726
1727        // Monitor the state transitions to ensure proper delayed -> stale progression
1728        let mut seen_delayed = false;
1729
1730        // Consume messages and track state transitions
1731        // Give enough time for the synchronizers to transition through states
1732        let timeout_duration = Duration::from_millis(500); // Generous timeout
1733        let start_time = tokio::time::Instant::now();
1734
1735        while let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await {
1736            // Track when synchronizers transition to delayed
1737            if !seen_delayed {
1738                let v2_state = msg.sync_states.get("uniswap-v2");
1739                let v3_state = msg.sync_states.get("uniswap-v3");
1740
1741                if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1742                    matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1743                {
1744                    seen_delayed = true;
1745                    // Verify nanny is still running when synchronizers are just delayed
1746                    assert!(!nanny_handle.is_finished(),
1747                        "Nanny should still be running when synchronizers are delayed (not stale yet)");
1748                    // Once we've seen delayed and verified nanny is running, we can break
1749                    break;
1750                }
1751            }
1752
1753            // Safety timeout to avoid infinite loop
1754            if start_time.elapsed() > timeout_duration {
1755                break;
1756            }
1757        }
1758
1759        // Wait for the main loop to complete (all synchronizers should eventually go stale)
1760        tokio::time::sleep(Duration::from_millis(200)).await;
1761
1762        // Consume any remaining messages until channel closes
1763        while rx.recv().await.is_some() {
1764            // Just drain the channel
1765        }
1766        // Channel is now closed
1767
1768        // The nanny should complete when the main loop exits due to no ready synchronizers
1769        let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1770        assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1771
1772        // Verify that synchronizers went through proper state transitions
1773        assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1774        // Note: We might not see the Stale state because stale synchronizers are immediately purged
1775        // The important thing is that they stayed in Delayed for the proper duration before being
1776        // removed
1777
1778        // If we reach here, the channel was closed, indicating the main loop exited gracefully
1779
1780        // Give cleanup time to execute
1781        tokio::time::sleep(Duration::from_millis(50)).await;
1782
1783        // Verify cleanup was triggered for both synchronizers
1784        assert!(
1785            v2_sync.was_close_received().await,
1786            "v2_sync should have received close signal during cleanup"
1787        );
1788        assert!(
1789            v3_sync.was_close_received().await,
1790            "v3_sync should have received close signal during cleanup"
1791        );
1792    }
1793}