1use std::{
2 collections::{HashMap, HashSet},
3 time::Duration,
4};
5
6use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
7use futures03::{
8 future::{join_all, try_join_all},
9 stream::FuturesUnordered,
10 StreamExt,
11};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::{
15 select,
16 sync::{
17 mpsc::{self, Receiver},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::timeout,
22};
23use tracing::{debug, error, info, trace, warn};
24use tycho_common::{
25 dto::{Block, ExtractorIdentity},
26 Bytes,
27};
28
29use crate::feed::{
30 block_history::{BlockHistory, BlockHistoryError, BlockPosition},
31 synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
32};
33
34mod block_history;
35pub mod component_tracker;
36pub mod synchronizer;
37
38pub trait HeaderLike {
43 fn block(self) -> Option<BlockHeader>;
44 fn block_number_or_timestamp(self) -> u64;
45}
46
47#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
48pub struct BlockHeader {
49 pub hash: Bytes,
50 pub number: u64,
51 pub parent_hash: Bytes,
52 pub revert: bool,
53 pub timestamp: u64,
54}
55
56impl BlockHeader {
57 fn from_block(block: &Block, revert: bool) -> Self {
58 Self {
59 hash: block.hash.clone(),
60 number: block.number,
61 parent_hash: block.parent_hash.clone(),
62 revert,
63 timestamp: block.ts.timestamp() as u64,
64 }
65 }
66}
67
68impl HeaderLike for BlockHeader {
69 fn block(self) -> Option<BlockHeader> {
70 Some(self)
71 }
72
73 fn block_number_or_timestamp(self) -> u64 {
74 self.number
75 }
76}
77
78#[derive(Error, Debug)]
79pub enum BlockSynchronizerError {
80 #[error("Failed to initialize synchronizer: {0}")]
81 InitializationError(#[from] SynchronizerError),
82
83 #[error("Failed to process new block: {0}")]
84 BlockHistoryError(#[from] BlockHistoryError),
85
86 #[error("Not a single synchronizer was ready")]
87 NoReadySynchronizers,
88
89 #[error("No synchronizers were set")]
90 NoSynchronizers,
91
92 #[error("Failed to convert duration: {0}")]
93 DurationConversionError(String),
94}
95
96type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
97
98pub struct BlockSynchronizer<S> {
146 synchronizers: Option<HashMap<ExtractorIdentity, S>>,
147 block_time: std::time::Duration,
148 max_wait: std::time::Duration,
149 max_messages: Option<usize>,
150 max_missed_blocks: u64,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154#[serde(tag = "status", rename_all = "lowercase")]
155pub enum SynchronizerState {
156 Started,
157 Ready(BlockHeader),
158 Stale(BlockHeader),
160 Delayed(BlockHeader),
161 Advanced(BlockHeader),
165 Ended,
166}
167
168pub struct SynchronizerStream {
169 extractor_id: ExtractorIdentity,
170 state: SynchronizerState,
171 modify_ts: NaiveDateTime,
172 rx: Receiver<StateSyncMessage<BlockHeader>>,
173}
174
175impl SynchronizerStream {
176 async fn try_advance(
177 &mut self,
178 block_history: &BlockHistory,
179 max_wait: std::time::Duration,
180 stale_threshold: std::time::Duration,
181 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
182 let extractor_id = self.extractor_id.clone();
183 let latest_block = block_history.latest();
184 match &self.state {
185 SynchronizerState::Started | SynchronizerState::Ended => {
186 warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
187 Ok(None)
188 }
189 SynchronizerState::Advanced(b) => {
190 let future_block = b.clone();
191 self.transition(future_block, block_history, stale_threshold)?;
193 Ok(None)
194 }
195 SynchronizerState::Ready(previous_block) => {
196 self.try_recv_next_expected(
198 max_wait,
199 block_history,
200 previous_block.clone(),
201 stale_threshold,
202 )
203 .await
204 }
206 SynchronizerState::Delayed(old_block) | SynchronizerState::Stale(old_block) => {
207 debug!(
209 ?old_block,
210 ?latest_block,
211 %extractor_id,
212 "Trying to catch up to latest block"
213 );
214 self.try_catch_up(block_history, max_wait, stale_threshold)
215 .await
216 }
217 }
218 }
219
220 async fn try_recv_next_expected(
225 &mut self,
226 max_wait: std::time::Duration,
227 block_history: &BlockHistory,
228 previous_block: BlockHeader,
229 stale_threshold: std::time::Duration,
230 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
231 let extractor_id = self.extractor_id.clone();
232 match timeout(max_wait, self.rx.recv()).await {
233 Ok(Some(msg)) => {
234 self.transition(msg.header.clone(), block_history, stale_threshold)?;
235 Ok(Some(msg))
236 }
237 Ok(None) => {
238 error!(
239 %extractor_id,
240 ?previous_block,
241 "SynchronizerStream terminated: channel closed!"
242 );
243 self.state = SynchronizerState::Ended;
244 self.modify_ts = Local::now().naive_utc();
245 Ok(None)
246 }
247 Err(_) => {
248 debug!(%extractor_id, ?previous_block, "No block received within time limit.");
250
251 match &self.state {
252 SynchronizerState::Ready(_) => {
253 self.state = SynchronizerState::Delayed(previous_block.clone());
255 self.modify_ts = Local::now().naive_utc();
256 }
257 SynchronizerState::Delayed(_) => {
258 self.check_and_transition_to_stale_if_needed(
261 stale_threshold,
262 Some(previous_block.clone()),
263 )?;
264 }
265 _ => {
266 if !self.check_and_transition_to_stale_if_needed(
268 stale_threshold,
269 Some(previous_block.clone()),
270 )? {
271 self.state = SynchronizerState::Delayed(previous_block.clone());
272 self.modify_ts = Local::now().naive_utc();
273 }
274 }
275 }
276 Ok(None)
277 }
278 }
279 }
280
281 async fn try_catch_up(
287 &mut self,
288 block_history: &BlockHistory,
289 max_wait: std::time::Duration,
290 stale_threshold: std::time::Duration,
291 ) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
292 let mut results = Vec::new();
293 let extractor_id = self.extractor_id.clone();
294
295 let deadline = std::time::Instant::now() + max_wait;
297
298 while std::time::Instant::now() < deadline {
299 match timeout(
300 deadline.saturating_duration_since(std::time::Instant::now()),
301 self.rx.recv(),
302 )
303 .await
304 {
305 Ok(Some(msg)) => {
306 debug!(%extractor_id, block_num=?msg.header.number, "Received new message during catch-up");
307 let block_pos = block_history.determine_block_position(&msg.header)?;
308 results.push(msg);
309 if matches!(block_pos, BlockPosition::NextExpected) {
310 break;
311 }
312 }
313 Ok(None) => {
314 warn!(%extractor_id, "Channel closed during catch-up");
315 self.state = SynchronizerState::Ended;
316 return Ok(None);
317 }
318 Err(_) => {
319 debug!(%extractor_id, "Timed out waiting for catch-up");
320 break;
321 }
322 }
323 }
324
325 let merged = results
326 .into_iter()
327 .reduce(|l, r| l.merge(r));
328
329 if let Some(msg) = merged {
330 debug!(?extractor_id, "Delayed extractor made progress!");
332 self.transition(msg.header.clone(), block_history, stale_threshold)?;
333 Ok(Some(msg))
334 } else {
335 self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
337 Ok(None)
338 }
339 }
340
341 fn check_and_transition_to_stale_if_needed(
343 &mut self,
344 stale_threshold: std::time::Duration,
345 fallback_header: Option<BlockHeader>,
346 ) -> Result<bool, BlockSynchronizerError> {
347 let now = Local::now().naive_utc();
348 let wait_duration = now.signed_duration_since(self.modify_ts);
349 let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
350 .map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
351
352 if wait_duration > stale_threshold_chrono {
353 let header_to_use = match (&self.state, fallback_header) {
354 (SynchronizerState::Ready(h), _) |
355 (SynchronizerState::Delayed(h), _) |
356 (SynchronizerState::Stale(h), _) => h.clone(),
357 (_, Some(h)) => h,
358 _ => BlockHeader::default(),
359 };
360
361 warn!(
362 extractor_id=%self.extractor_id,
363 last_message_at=?self.modify_ts,
364 "SynchronizerStream transition to stale due to timeout."
365 );
366 self.state = SynchronizerState::Stale(header_to_use);
367 self.modify_ts = now;
368 Ok(true)
369 } else {
370 Ok(false)
371 }
372 }
373
374 fn transition(
381 &mut self,
382 latest_retrieved: BlockHeader,
383 block_history: &BlockHistory,
384 stale_threshold: std::time::Duration,
385 ) -> Result<(), BlockSynchronizerError> {
386 let extractor_id = self.extractor_id.clone();
387 let last_message_at = self.modify_ts;
388 let block = &latest_retrieved;
389
390 match block_history.determine_block_position(&latest_retrieved)? {
391 BlockPosition::NextExpected => {
392 self.state = SynchronizerState::Ready(latest_retrieved.clone());
393 trace!(
394 next = ?latest_retrieved,
395 extractor = ?extractor_id,
396 "SynchronizerStream transition to next expected"
397 )
398 }
399 BlockPosition::Latest | BlockPosition::Delayed => {
400 if !self.check_and_transition_to_stale_if_needed(
401 stale_threshold,
402 Some(latest_retrieved.clone()),
403 )? {
404 warn!(
405 ?extractor_id,
406 ?last_message_at,
407 ?block,
408 "SynchronizerStream transition transition to delayed."
409 );
410 self.state = SynchronizerState::Delayed(latest_retrieved.clone());
411 }
412 }
413 BlockPosition::Advanced => {
414 error!(
415 ?extractor_id,
416 ?last_message_at,
417 latest = ?block_history.latest(),
418 ?block,
419 "SynchronizerStream transition to advanced."
420 );
421 self.state = SynchronizerState::Advanced(latest_retrieved.clone());
422 }
423 }
424 self.modify_ts = Local::now().naive_utc();
425 Ok(())
426 }
427}
428
429#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
430pub struct FeedMessage<H = BlockHeader>
431where
432 H: HeaderLike,
433{
434 pub state_msgs: HashMap<String, StateSyncMessage<H>>,
435 pub sync_states: HashMap<String, SynchronizerState>,
436}
437
438impl<H> FeedMessage<H>
439where
440 H: HeaderLike,
441{
442 fn new(
443 state_msgs: HashMap<String, StateSyncMessage<H>>,
444 sync_states: HashMap<String, SynchronizerState>,
445 ) -> Self {
446 Self { state_msgs, sync_states }
447 }
448}
449
450impl<S> BlockSynchronizer<S>
451where
452 S: StateSynchronizer,
453{
454 pub fn new(
455 block_time: std::time::Duration,
456 max_wait: std::time::Duration,
457 max_missed_blocks: u64,
458 ) -> Self {
459 Self { synchronizers: None, max_messages: None, block_time, max_wait, max_missed_blocks }
460 }
461
462 pub fn max_messages(&mut self, val: usize) {
463 self.max_messages = Some(val);
464 }
465
466 pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
467 let mut registered = self.synchronizers.unwrap_or_default();
468 registered.insert(id, synchronizer);
469 self.synchronizers = Some(registered);
470 self
471 }
472
473 #[cfg(test)]
474 pub fn with_short_timeouts() -> Self {
475 Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
476 }
477
478 async fn cleanup_synchronizers(
481 mut state_sync_tasks: FuturesUnordered<JoinHandle<SyncResult<()>>>,
482 sync_close_senders: Vec<oneshot::Sender<()>>,
483 ) {
484 for close_sender in sync_close_senders {
486 let _ = close_sender.send(());
487 }
488
489 let mut completed_tasks = 0;
491 while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
492 completed_tasks += 1;
493 }
494
495 let remaining_tasks = state_sync_tasks.len();
497 if remaining_tasks > 0 {
498 warn!(
499 completed = completed_tasks,
500 timed_out = remaining_tasks,
501 "Some synchronizers timed out during cleanup and may not have shut down cleanly"
502 );
503 }
504 }
505
506 pub async fn run(
507 mut self,
508 ) -> BlockSyncResult<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>)> {
509 trace!("Starting BlockSynchronizer...");
510 let mut state_sync_tasks = FuturesUnordered::new();
511 let mut synchronizers = self
512 .synchronizers
513 .take()
514 .ok_or(BlockSynchronizerError::NoSynchronizers)?;
515 let init_tasks = synchronizers
517 .values_mut()
518 .map(|s| s.initialize())
519 .collect::<Vec<_>>();
520 try_join_all(init_tasks).await?;
521
522 let mut sync_streams = HashMap::with_capacity(synchronizers.len());
523 let mut sync_close_senders = Vec::new();
524 for (extractor_id, synchronizer) in synchronizers.drain() {
525 let (handle, rx) = synchronizer.start().await?;
526 let (join_handle, close_sender) = handle.split();
527 state_sync_tasks.push(join_handle);
528 sync_close_senders.push(close_sender);
529
530 sync_streams.insert(
531 extractor_id.clone(),
532 SynchronizerStream {
533 extractor_id,
534 state: SynchronizerState::Started,
535 modify_ts: Local::now().naive_utc(),
536 rx,
537 },
538 );
539 }
540
541 debug!("Waiting for initial synchronizer messages...");
544 let mut startup_futures = Vec::new();
545 for (id, sh) in sync_streams.iter_mut() {
546 let fut = async {
547 let res = timeout(self.block_time + self.max_wait, sh.rx.recv()).await;
548 (id.clone(), res)
549 };
550 startup_futures.push(fut);
551 }
552 let mut ready_sync_msgs = HashMap::new();
553 let initial_headers = join_all(startup_futures)
554 .await
555 .into_iter()
556 .filter_map(|(extractor_id, res)| {
557 let synchronizer = sync_streams
558 .get_mut(&extractor_id)
559 .unwrap();
560 match res {
561 Ok(Some(msg)) => {
562 debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
563 synchronizer.state = SynchronizerState::Ready(msg.header.clone());
565 synchronizer.modify_ts = Local::now().naive_utc();
566 ready_sync_msgs.insert(extractor_id.name.clone(), msg.clone());
567 Some(msg.header)
568 }
569 Ok(None) => {
570 warn!(%extractor_id, "Dead synchronizer at startup will be purged!");
571 synchronizer.state = SynchronizerState::Ended;
572 synchronizer.modify_ts = Local::now().naive_utc();
573 None
574 }
575 Err(_) => {
576 warn!(%extractor_id, "Timed out waiting for first message: Stale synchronizer at startup will be purged!");
577 synchronizer.state = SynchronizerState::Stale(BlockHeader::default());
578 synchronizer.modify_ts = Local::now().naive_utc();
579 None
580 }
581 }
582 })
583 .collect::<HashSet<_>>() .into_iter()
585 .collect::<Vec<_>>();
586
587 let mut block_history = BlockHistory::new(initial_headers, 15)?;
588
589 let start_header = block_history
591 .latest()
592 .ok_or(BlockSynchronizerError::NoReadySynchronizers)?;
593 info!(
594 start_block=?start_header,
595 n_healthy=?ready_sync_msgs.len(),
596 "Block synchronization started successfully!"
597 );
598
599 sync_streams.retain(|_, v| matches!(v.state, SynchronizerState::Ready(_)));
604
605 for (_, stream) in sync_streams.iter_mut() {
608 if let SynchronizerState::Ready(header) = &stream.state.clone() {
609 if header.number < start_header.number {
610 stream.state = SynchronizerState::Delayed(header.clone());
611 debug!(
612 extractor_id=%stream.extractor_id,
613 synchronizer_block=?header.number,
614 current_block=?start_header.number,
615 "Marking synchronizer as delayed during initialization"
616 );
617 }
618 }
619 }
620
621 let (sync_tx, sync_rx) = mpsc::channel(30);
622 let main_loop_jh: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
623 let mut n_iter = 1;
624 loop {
625 sync_tx
627 .send(FeedMessage::new(
628 std::mem::take(&mut ready_sync_msgs),
629 sync_streams
630 .iter()
631 .map(|(a, b)| (a.name.to_string(), b.state.clone()))
632 .collect(),
633 ))
634 .await?;
635
636 if let Some(max_messages) = self.max_messages {
638 if n_iter >= max_messages {
639 info!(max_messages, "StreamEnd");
640 return Ok(());
641 }
642 }
643 n_iter += 1;
644
645 let mut recv_futures = Vec::new();
655 for (extractor_id, sh) in sync_streams.iter_mut() {
656 recv_futures.push(async {
657 let res = sh
658 .try_advance(
659 &block_history,
660 self.block_time + self.max_wait,
661 self.block_time
662 .mul_f64(self.max_missed_blocks as f64),
663 )
664 .await?;
665 Ok::<_, BlockSynchronizerError>(
666 res.map(|msg| (extractor_id.name.clone(), msg)),
667 )
668 });
669 }
670 ready_sync_msgs.extend(
671 join_all(recv_futures)
672 .await
673 .into_iter()
674 .collect::<Result<Vec<_>, _>>()?
675 .into_iter()
676 .flatten(),
677 );
678
679 sync_streams.retain(|_, v| match v.state {
682 SynchronizerState::Started | SynchronizerState::Ended => false,
683 SynchronizerState::Stale(_) => false,
684 SynchronizerState::Ready(_) => true,
685 SynchronizerState::Delayed(_) => true,
686 SynchronizerState::Advanced(_) => true,
687 });
688
689 if sync_streams.is_empty() {
692 error!("No healthy SynchronizerStream remain");
693 return Err(BlockSynchronizerError::NoReadySynchronizers.into());
694 }
695
696 if let Some(header) = sync_streams
698 .values()
699 .filter_map(|v| match &v.state {
700 SynchronizerState::Ready(b) | SynchronizerState::Delayed(b) => Some(b),
701 _ => None,
702 })
703 .max_by_key(|b| b.number)
704 {
705 block_history.push(header.clone())?;
706 } else {
707 error!("Only advanced SynchronizerStreams remain");
710 return Err(BlockSynchronizerError::NoReadySynchronizers.into());
711 }
712 }
713 });
714
715 let nanny_jh = tokio::spawn(async move {
716 select! {
717 error = state_sync_tasks.select_next_some() => {
718 Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
719 error!(?error, "State synchronizer exited");
720 },
721 error = main_loop_jh => {
722 Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
723 error!(?error, "Feed main loop exited");
724 }
725 }
726 });
727 Ok((nanny_jh, sync_rx))
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use std::sync::Arc;
734
735 use async_trait::async_trait;
736 use test_log::test;
737 use tokio::sync::{oneshot, Mutex};
738 use tycho_common::dto::Chain;
739
740 use super::*;
741 use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
742
743 #[derive(Clone, Debug)]
744 enum MockBehavior {
745 Normal, FailOnExit, IgnoreClose, ExitImmediately, }
750
751 #[derive(Clone)]
752 struct MockStateSync {
753 header_tx: mpsc::Sender<StateSyncMessage<BlockHeader>>,
754 header_rx: Arc<Mutex<Option<Receiver<StateSyncMessage<BlockHeader>>>>>,
755 close_received: Arc<Mutex<bool>>,
756 behavior: MockBehavior,
757 close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
759 }
760
761 impl MockStateSync {
762 fn new() -> Self {
763 Self::with_behavior(MockBehavior::Normal)
764 }
765
766 fn with_behavior(behavior: MockBehavior) -> Self {
767 let (tx, rx) = mpsc::channel(1);
768 Self {
769 header_tx: tx,
770 header_rx: Arc::new(Mutex::new(Some(rx))),
771 close_received: Arc::new(Mutex::new(false)),
772 behavior,
773 close_tx: Arc::new(Mutex::new(None)),
774 }
775 }
776
777 async fn was_close_received(&self) -> bool {
778 *self.close_received.lock().await
779 }
780
781 async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
782 self.header_tx
783 .send(header)
784 .await
785 .map_err(|e| format!("sending header failed: {e}"))
786 }
787
788 async fn trigger_close(&self) {
790 if let Some(close_tx) = self.close_tx.lock().await.take() {
791 let _ = close_tx.send(());
792 }
793 }
794 }
795
796 #[async_trait]
797 impl StateSynchronizer for MockStateSync {
798 async fn initialize(&mut self) -> SyncResult<()> {
799 Ok(())
800 }
801
802 async fn start(
803 mut self,
804 ) -> SyncResult<(SynchronizerTaskHandle, Receiver<StateSyncMessage<BlockHeader>>)> {
805 let block_rx = {
806 let mut guard = self.header_rx.lock().await;
807 guard
808 .take()
809 .expect("Block receiver was not set!")
810 };
811
812 let (close_tx_for_handle, close_rx) = oneshot::channel();
815 let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
816
817 {
819 let mut guard = self.close_tx.lock().await;
820 *guard = Some(close_tx_for_test);
821 }
822
823 let close_received_clone = self.close_received.clone();
824 let behavior = self.behavior.clone();
825
826 let jh = tokio::spawn(async move {
827 match behavior {
828 MockBehavior::IgnoreClose => {
829 loop {
832 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
833 }
834 }
835 MockBehavior::ExitImmediately => {
836 SyncResult::Err(SynchronizerError::ConnectionError(
838 "Simulated immediate task failure".to_string(),
839 ))
840 }
841 MockBehavior::Normal | MockBehavior::FailOnExit => {
842 let result = tokio::select! {
845 result = close_rx => result,
846 result = close_rx_for_test => result,
847 };
848
849 match result {
850 Ok(()) => {
851 let mut guard = close_received_clone.lock().await;
853 *guard = true;
854
855 match behavior {
856 MockBehavior::Normal => SyncResult::Ok(()),
857 MockBehavior::FailOnExit => {
858 SyncResult::Err(SynchronizerError::ConnectionError(
859 "Simulated task failure on close".to_string(),
860 ))
861 }
862 _ => unreachable!(),
863 }
864 }
865 Err(_) => {
866 match behavior {
868 MockBehavior::Normal => SyncResult::Ok(()),
869 MockBehavior::FailOnExit => {
870 SyncResult::Err(SynchronizerError::ConnectionError(
871 "Simulated task failure on close sender drop"
872 .to_string(),
873 ))
874 }
875 _ => unreachable!(),
876 }
877 }
878 }
879 }
880 }
881 });
882
883 let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
884 Ok((handle, block_rx))
885 }
886 }
887
888 #[test(tokio::test)]
889 async fn test_two_ready_synchronizers() {
890 let v2_sync = MockStateSync::new();
891 let v3_sync = MockStateSync::new();
892 let block_sync = BlockSynchronizer::with_short_timeouts()
893 .register_synchronizer(
894 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
895 v2_sync.clone(),
896 )
897 .register_synchronizer(
898 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
899 v3_sync.clone(),
900 );
901 let start_msg = StateSyncMessage {
902 header: BlockHeader { number: 1, ..Default::default() },
903 ..Default::default()
904 };
905 v2_sync
906 .send_header(start_msg.clone())
907 .await
908 .expect("send_header failed");
909 v3_sync
910 .send_header(start_msg.clone())
911 .await
912 .expect("send_header failed");
913
914 let (_jh, mut rx) = block_sync
915 .run()
916 .await
917 .expect("BlockSynchronizer failed to start.");
918 let first_feed_msg = rx
919 .recv()
920 .await
921 .expect("header channel was closed");
922 let second_msg = StateSyncMessage {
923 header: BlockHeader { number: 2, ..Default::default() },
924 ..Default::default()
925 };
926 v2_sync
927 .send_header(second_msg.clone())
928 .await
929 .expect("send_header failed");
930 v3_sync
931 .send_header(second_msg.clone())
932 .await
933 .expect("send_header failed");
934 let second_feed_msg = rx
935 .recv()
936 .await
937 .expect("header channel was closed!");
938
939 let exp1 = FeedMessage {
940 state_msgs: [
941 ("uniswap-v2".to_string(), start_msg.clone()),
942 ("uniswap-v3".to_string(), start_msg.clone()),
943 ]
944 .into_iter()
945 .collect(),
946 sync_states: [
947 ("uniswap-v3".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
948 ("uniswap-v2".to_string(), SynchronizerState::Ready(start_msg.header.clone())),
949 ]
950 .into_iter()
951 .collect(),
952 };
953 let exp2 = FeedMessage {
954 state_msgs: [
955 ("uniswap-v2".to_string(), second_msg.clone()),
956 ("uniswap-v3".to_string(), second_msg.clone()),
957 ]
958 .into_iter()
959 .collect(),
960 sync_states: [
961 ("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
962 ("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
963 ]
964 .into_iter()
965 .collect(),
966 };
967 assert_eq!(first_feed_msg, exp1);
968 assert_eq!(second_feed_msg, exp2);
969 }
970
971 #[test(tokio::test)]
972 async fn test_delayed_synchronizer_catches_up() {
973 let v2_sync = MockStateSync::new();
974 let v3_sync = MockStateSync::new();
975 let block_sync = BlockSynchronizer::with_short_timeouts()
976 .register_synchronizer(
977 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
978 v2_sync.clone(),
979 )
980 .register_synchronizer(
981 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
982 v3_sync.clone(),
983 );
984
985 let block1_msg = StateSyncMessage {
987 header: BlockHeader {
988 number: 1,
989 hash: Bytes::from(vec![1]),
990 parent_hash: Bytes::from(vec![0]),
991 revert: false,
992 ..Default::default()
993 },
994 ..Default::default()
995 };
996 v2_sync
997 .send_header(block1_msg.clone())
998 .await
999 .expect("send_header failed");
1000 v3_sync
1001 .send_header(block1_msg.clone())
1002 .await
1003 .expect("send_header failed");
1004
1005 let (_jh, mut rx) = block_sync
1007 .run()
1008 .await
1009 .expect("BlockSynchronizer failed to start.");
1010
1011 let first_feed_msg = rx
1013 .recv()
1014 .await
1015 .expect("header channel was closed");
1016 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1017 assert!(matches!(
1018 first_feed_msg
1019 .sync_states
1020 .get("uniswap-v2")
1021 .unwrap(),
1022 SynchronizerState::Ready(_)
1023 ));
1024 assert!(matches!(
1025 first_feed_msg
1026 .sync_states
1027 .get("uniswap-v3")
1028 .unwrap(),
1029 SynchronizerState::Ready(_)
1030 ));
1031
1032 let block2_msg = StateSyncMessage {
1034 header: BlockHeader {
1035 number: 2,
1036 hash: Bytes::from(vec![2]),
1037 parent_hash: Bytes::from(vec![1]),
1038 revert: false,
1039 ..Default::default()
1040 },
1041 ..Default::default()
1042 };
1043 v2_sync
1044 .send_header(block2_msg.clone())
1045 .await
1046 .expect("send_header failed");
1047
1048 let second_feed_msg = rx
1050 .recv()
1051 .await
1052 .expect("header channel was closed");
1053 debug!("Consumed second message for v2");
1054
1055 assert!(second_feed_msg
1056 .state_msgs
1057 .contains_key("uniswap-v2"));
1058 assert!(matches!(
1059 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1060 SynchronizerState::Ready(header) if header.number == 2
1061 ));
1062 assert!(!second_feed_msg
1063 .state_msgs
1064 .contains_key("uniswap-v3"));
1065 assert!(matches!(
1066 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1067 SynchronizerState::Delayed(header) if header.number == 1
1068 ));
1069
1070 v3_sync
1072 .send_header(block2_msg.clone())
1073 .await
1074 .expect("send_header failed");
1075
1076 let block3_msg = StateSyncMessage {
1078 header: BlockHeader {
1079 number: 3,
1080 hash: Bytes::from(vec![3]),
1081 parent_hash: Bytes::from(vec![2]),
1082 revert: false,
1083 ..Default::default()
1084 },
1085 ..Default::default()
1086 };
1087 v2_sync
1088 .send_header(block3_msg.clone())
1089 .await
1090 .expect("send_header failed");
1091 v3_sync
1092 .send_header(block3_msg)
1093 .await
1094 .expect("send_header failed");
1095
1096 let mut third_feed_msg = rx
1099 .recv()
1100 .await
1101 .expect("header channel was closed");
1102
1103 if !third_feed_msg
1106 .state_msgs
1107 .contains_key("uniswap-v2")
1108 {
1109 third_feed_msg = rx
1110 .recv()
1111 .await
1112 .expect("header channel was closed");
1113 }
1114 assert!(third_feed_msg
1115 .state_msgs
1116 .contains_key("uniswap-v2"));
1117 assert!(third_feed_msg
1118 .state_msgs
1119 .contains_key("uniswap-v3"));
1120 assert!(matches!(
1121 third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1122 SynchronizerState::Ready(header) if header.number == 3
1123 ));
1124 assert!(matches!(
1125 third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1126 SynchronizerState::Ready(header) if header.number == 3
1127 ));
1128 }
1129
1130 #[test(tokio::test)]
1131 async fn test_different_start_blocks() {
1132 let v2_sync = MockStateSync::new();
1133 let v3_sync = MockStateSync::new();
1134 let block_sync = BlockSynchronizer::with_short_timeouts()
1135 .register_synchronizer(
1136 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1137 v2_sync.clone(),
1138 )
1139 .register_synchronizer(
1140 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1141 v3_sync.clone(),
1142 );
1143
1144 let block1_msg = StateSyncMessage {
1146 header: BlockHeader {
1147 number: 1,
1148 hash: Bytes::from(vec![1]),
1149 parent_hash: Bytes::from(vec![0]),
1150 revert: false,
1151 ..Default::default()
1152 },
1153 ..Default::default()
1154 };
1155 let block2_msg = StateSyncMessage {
1156 header: BlockHeader {
1157 number: 2,
1158 hash: Bytes::from(vec![2]),
1159 parent_hash: Bytes::from(vec![1]),
1160 revert: false,
1161 ..Default::default()
1162 },
1163 ..Default::default()
1164 };
1165
1166 let _ = v2_sync
1167 .send_header(block1_msg.clone())
1168 .await;
1169 v3_sync
1170 .send_header(block2_msg.clone())
1171 .await
1172 .expect("send_header failed");
1173
1174 let (_jh, mut rx) = block_sync
1176 .run()
1177 .await
1178 .expect("BlockSynchronizer failed to start.");
1179
1180 let first_feed_msg = rx
1182 .recv()
1183 .await
1184 .expect("header channel was closed");
1185 assert!(matches!(
1186 first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1187 SynchronizerState::Delayed(header) if header.number == 1
1188 ));
1189 assert!(matches!(
1190 first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1191 SynchronizerState::Ready(header) if header.number == 2
1192 ));
1193
1194 v2_sync
1196 .send_header(block2_msg.clone())
1197 .await
1198 .expect("send_header failed");
1199
1200 let block3_msg = StateSyncMessage {
1202 header: BlockHeader {
1203 number: 3,
1204 hash: Bytes::from(vec![3]),
1205 parent_hash: Bytes::from(vec![2]),
1206 revert: false,
1207 ..Default::default()
1208 },
1209 ..Default::default()
1210 };
1211 let _ = v2_sync
1212 .send_header(block3_msg.clone())
1213 .await;
1214 v3_sync
1215 .send_header(block3_msg.clone())
1216 .await
1217 .expect("send_header failed");
1218
1219 let second_feed_msg = rx
1221 .recv()
1222 .await
1223 .expect("header channel was closed");
1224 assert_eq!(second_feed_msg.state_msgs.len(), 2);
1225 assert!(matches!(
1226 second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
1227 SynchronizerState::Ready(header) if header.number == 3
1228 ));
1229 assert!(matches!(
1230 second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
1231 SynchronizerState::Ready(header) if header.number == 3
1232 ));
1233 }
1234
1235 #[test(tokio::test)]
1236 async fn test_synchronizer_task_failure_triggers_cleanup() {
1237 let v2_sync = MockStateSync::with_behavior(MockBehavior::ExitImmediately);
1241 let v3_sync = MockStateSync::new(); let block_sync = BlockSynchronizer::with_short_timeouts()
1244 .register_synchronizer(
1245 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1246 v2_sync.clone(),
1247 )
1248 .register_synchronizer(
1249 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1250 v3_sync.clone(),
1251 );
1252
1253 let start_msg = StateSyncMessage {
1255 header: BlockHeader { number: 1, ..Default::default() },
1256 ..Default::default()
1257 };
1258 v2_sync
1259 .send_header(start_msg.clone())
1260 .await
1261 .expect("send_header failed");
1262 v3_sync
1263 .send_header(start_msg.clone())
1264 .await
1265 .expect("send_header failed");
1266
1267 let (nanny_handle, mut sync_rx) = block_sync
1269 .run()
1270 .await
1271 .expect("BlockSynchronizer failed to start");
1272
1273 let first_msg = sync_rx
1275 .recv()
1276 .await
1277 .expect("Should receive first message");
1278 assert!(!first_msg.state_msgs.is_empty());
1280
1281 let result = timeout(Duration::from_secs(2), nanny_handle).await;
1283 assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1284
1285 tokio::time::sleep(Duration::from_millis(50)).await;
1287
1288 assert!(
1290 v3_sync.was_close_received().await,
1291 "v3_sync should have received close signal during cleanup"
1292 );
1293 }
1294
1295 #[test(tokio::test)]
1296 async fn test_synchronizer_task_exit_triggers_cleanup() {
1297 let v2_sync = MockStateSync::with_behavior(MockBehavior::FailOnExit);
1301 let v3_sync = MockStateSync::new();
1302
1303 let block_sync = BlockSynchronizer::with_short_timeouts()
1304 .register_synchronizer(
1305 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1306 v2_sync.clone(),
1307 )
1308 .register_synchronizer(
1309 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1310 v3_sync.clone(),
1311 );
1312
1313 let start_msg = StateSyncMessage {
1315 header: BlockHeader { number: 1, ..Default::default() },
1316 ..Default::default()
1317 };
1318 v2_sync
1319 .send_header(start_msg.clone())
1320 .await
1321 .expect("send_header failed");
1322 v3_sync
1323 .send_header(start_msg.clone())
1324 .await
1325 .expect("send_header failed");
1326
1327 let (nanny_handle, mut sync_rx) = block_sync
1329 .run()
1330 .await
1331 .expect("BlockSynchronizer failed to start");
1332
1333 let first_msg = sync_rx
1335 .recv()
1336 .await
1337 .expect("Should receive first message");
1338 assert_eq!(first_msg.state_msgs.len(), 2);
1339
1340 v2_sync.trigger_close().await;
1343
1344 let result = timeout(Duration::from_secs(2), nanny_handle).await;
1346 assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
1347
1348 tokio::time::sleep(Duration::from_millis(50)).await;
1350
1351 assert!(
1353 v3_sync.was_close_received().await,
1354 "v3_sync should have received close signal during cleanup"
1355 );
1356 }
1357
1358 #[test(tokio::test)]
1359 async fn test_main_loop_timeout_triggers_cleanup() {
1360 let v2_sync = MockStateSync::new();
1364 let v3_sync = MockStateSync::new();
1365
1366 let block_sync = BlockSynchronizer::with_short_timeouts()
1367 .register_synchronizer(
1368 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1369 v2_sync.clone(),
1370 )
1371 .register_synchronizer(
1372 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1373 v3_sync.clone(),
1374 );
1375
1376 let start_msg = StateSyncMessage {
1378 header: BlockHeader { number: 1, ..Default::default() },
1379 ..Default::default()
1380 };
1381 v2_sync
1382 .send_header(start_msg.clone())
1383 .await
1384 .expect("send_header failed");
1385 v3_sync
1386 .send_header(start_msg.clone())
1387 .await
1388 .expect("send_header failed");
1389
1390 let (nanny_handle, mut sync_rx) = block_sync
1392 .run()
1393 .await
1394 .expect("BlockSynchronizer failed to start");
1395
1396 let first_msg = sync_rx
1398 .recv()
1399 .await
1400 .expect("Should receive first message");
1401 assert_eq!(first_msg.state_msgs.len(), 2);
1402
1403 let result = timeout(Duration::from_secs(3), nanny_handle).await;
1408 assert!(
1409 result.is_ok(),
1410 "Nanny should complete when main loop errors due to no ready synchronizers"
1411 );
1412
1413 tokio::time::sleep(Duration::from_millis(50)).await;
1415
1416 assert!(
1418 v2_sync.was_close_received().await,
1419 "v2_sync should have received close signal during cleanup"
1420 );
1421 assert!(
1422 v3_sync.was_close_received().await,
1423 "v3_sync should have received close signal during cleanup"
1424 );
1425 }
1426
1427 #[test(tokio::test)]
1428 async fn test_cleanup_timeout_warning() {
1429 let v2_sync = MockStateSync::with_behavior(MockBehavior::ExitImmediately);
1433 let v3_sync = MockStateSync::with_behavior(MockBehavior::IgnoreClose);
1434
1435 let block_sync = BlockSynchronizer::with_short_timeouts()
1436 .register_synchronizer(
1437 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1438 v2_sync.clone(),
1439 )
1440 .register_synchronizer(
1441 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1442 v3_sync.clone(),
1443 );
1444
1445 let start_msg = StateSyncMessage {
1447 header: BlockHeader { number: 1, ..Default::default() },
1448 ..Default::default()
1449 };
1450 v2_sync
1451 .send_header(start_msg.clone())
1452 .await
1453 .expect("send_header failed");
1454 v3_sync
1455 .send_header(start_msg.clone())
1456 .await
1457 .expect("send_header failed");
1458
1459 let (nanny_handle, mut sync_rx) = block_sync
1462 .run()
1463 .await
1464 .expect("BlockSynchronizer failed to start");
1465
1466 let _ = sync_rx.recv().await;
1468
1469 let result = timeout(Duration::from_secs(10), nanny_handle).await;
1471 assert!(
1472 result.is_ok(),
1473 "Nanny should complete even when some synchronizers timeout during cleanup"
1474 );
1475
1476 }
1480
1481 #[test(tokio::test)]
1482 async fn test_one_synchronizer_goes_stale_while_other_works() {
1483 let v2_sync = MockStateSync::new();
1486 let v3_sync = MockStateSync::new();
1487
1488 let mut block_sync = BlockSynchronizer::new(
1490 Duration::from_millis(20), Duration::from_millis(10), 2, );
1494 block_sync.max_messages(5); let block_sync = block_sync
1497 .register_synchronizer(
1498 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1499 v2_sync.clone(),
1500 )
1501 .register_synchronizer(
1502 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1503 v3_sync.clone(),
1504 );
1505
1506 let block1_msg = StateSyncMessage {
1508 header: BlockHeader {
1509 number: 1,
1510 hash: Bytes::from(vec![1]),
1511 parent_hash: Bytes::from(vec![0]),
1512 revert: false,
1513 timestamp: 1000,
1514 },
1515 ..Default::default()
1516 };
1517 let _ = v2_sync
1518 .send_header(block1_msg.clone())
1519 .await;
1520 let _ = v3_sync
1521 .send_header(block1_msg.clone())
1522 .await;
1523
1524 let (nanny_handle, mut rx) = block_sync
1526 .run()
1527 .await
1528 .expect("BlockSynchronizer failed to start");
1529
1530 let first_feed_msg = rx
1532 .recv()
1533 .await
1534 .expect("Should receive first message");
1535 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1536 assert!(matches!(
1537 first_feed_msg
1538 .sync_states
1539 .get("uniswap-v2")
1540 .unwrap(),
1541 SynchronizerState::Ready(_)
1542 ));
1543 assert!(matches!(
1544 first_feed_msg
1545 .sync_states
1546 .get("uniswap-v3")
1547 .unwrap(),
1548 SynchronizerState::Ready(_)
1549 ));
1550
1551 let block2_msg = StateSyncMessage {
1553 header: BlockHeader {
1554 number: 2,
1555 hash: Bytes::from(vec![2]),
1556 parent_hash: Bytes::from(vec![1]),
1557 revert: false,
1558 timestamp: 2000,
1559 },
1560 ..Default::default()
1561 };
1562 let _ = v3_sync
1563 .send_header(block2_msg.clone())
1564 .await;
1565 let second_feed_msg = rx
1569 .recv()
1570 .await
1571 .expect("Should receive second message");
1572 assert!(second_feed_msg
1573 .state_msgs
1574 .contains_key("uniswap-v3"));
1575 assert!(!second_feed_msg
1576 .state_msgs
1577 .contains_key("uniswap-v2"));
1578 assert!(matches!(
1579 second_feed_msg
1580 .sync_states
1581 .get("uniswap-v3")
1582 .unwrap(),
1583 SynchronizerState::Ready(_)
1584 ));
1585 if let Some(v2_state) = second_feed_msg
1587 .sync_states
1588 .get("uniswap-v2")
1589 {
1590 if matches!(v2_state, SynchronizerState::Delayed(_)) {
1591 assert!(
1593 !nanny_handle.is_finished(),
1594 "Nanny should still be running when synchronizer is delayed (not stale yet)"
1595 );
1596 }
1597 }
1598
1599 tokio::time::sleep(Duration::from_millis(15)).await;
1601
1602 let block3_msg = StateSyncMessage {
1604 header: BlockHeader {
1605 number: 3,
1606 hash: Bytes::from(vec![3]),
1607 parent_hash: Bytes::from(vec![2]),
1608 revert: false,
1609 timestamp: 3000,
1610 },
1611 ..Default::default()
1612 };
1613 let _ = v3_sync
1614 .send_header(block3_msg.clone())
1615 .await;
1616
1617 tokio::time::sleep(Duration::from_millis(100)).await;
1619
1620 let mut found_removed = false;
1622
1623 for _ in 0..3 {
1624 if let Some(msg) = rx.recv().await {
1625 if !msg
1626 .sync_states
1627 .contains_key("uniswap-v2")
1628 {
1629 found_removed = true;
1631 }
1632
1633 if let Some(v3_state) = msg.sync_states.get("uniswap-v3") {
1635 assert!(
1636 !matches!(v3_state, SynchronizerState::Stale(_) | SynchronizerState::Ended),
1637 "v3 should not be stale or ended, but was: {v3_state:?}"
1638 );
1639 }
1640
1641 if found_removed {
1642 break;
1643 }
1644 } else {
1645 break;
1646 }
1647 }
1648
1649 assert!(found_removed, "v2 synchronizer should be removed after going stale");
1651 }
1653
1654 #[test(tokio::test)]
1655 async fn test_all_synchronizers_go_stale_main_loop_exits() {
1656 let v2_sync = MockStateSync::new();
1659 let v3_sync = MockStateSync::new();
1660
1661 let mut block_sync = BlockSynchronizer::new(
1663 Duration::from_millis(20), Duration::from_millis(10), 3, );
1667 block_sync.max_messages(10); let block_sync = block_sync
1670 .register_synchronizer(
1671 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
1672 v2_sync.clone(),
1673 )
1674 .register_synchronizer(
1675 ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
1676 v3_sync.clone(),
1677 );
1678
1679 let block1_msg = StateSyncMessage {
1681 header: BlockHeader {
1682 number: 1,
1683 hash: Bytes::from(vec![1]),
1684 parent_hash: Bytes::from(vec![0]),
1685 revert: false,
1686 timestamp: 1000,
1687 },
1688 ..Default::default()
1689 };
1690 let _ = v2_sync
1691 .send_header(block1_msg.clone())
1692 .await;
1693 let _ = v3_sync
1694 .send_header(block1_msg.clone())
1695 .await;
1696
1697 let (nanny_handle, mut rx) = block_sync
1699 .run()
1700 .await
1701 .expect("BlockSynchronizer failed to start");
1702
1703 let first_feed_msg = rx
1705 .recv()
1706 .await
1707 .expect("Should receive first message");
1708 assert_eq!(first_feed_msg.state_msgs.len(), 2);
1709 assert!(matches!(
1710 first_feed_msg
1711 .sync_states
1712 .get("uniswap-v2")
1713 .unwrap(),
1714 SynchronizerState::Ready(_)
1715 ));
1716 assert!(matches!(
1717 first_feed_msg
1718 .sync_states
1719 .get("uniswap-v3")
1720 .unwrap(),
1721 SynchronizerState::Ready(_)
1722 ));
1723
1724 let mut seen_delayed = false;
1729
1730 let timeout_duration = Duration::from_millis(500); let start_time = tokio::time::Instant::now();
1734
1735 while let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await {
1736 if !seen_delayed {
1738 let v2_state = msg.sync_states.get("uniswap-v2");
1739 let v3_state = msg.sync_states.get("uniswap-v3");
1740
1741 if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
1742 matches!(v3_state, Some(SynchronizerState::Delayed(_)))
1743 {
1744 seen_delayed = true;
1745 assert!(!nanny_handle.is_finished(),
1747 "Nanny should still be running when synchronizers are delayed (not stale yet)");
1748 break;
1750 }
1751 }
1752
1753 if start_time.elapsed() > timeout_duration {
1755 break;
1756 }
1757 }
1758
1759 tokio::time::sleep(Duration::from_millis(200)).await;
1761
1762 while rx.recv().await.is_some() {
1764 }
1766 let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
1770 assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
1771
1772 assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
1774 tokio::time::sleep(Duration::from_millis(50)).await;
1782
1783 assert!(
1785 v2_sync.was_close_received().await,
1786 "v2_sync should have received close signal during cleanup"
1787 );
1788 assert!(
1789 v3_sync.was_close_received().await,
1790 "v3_sync should have received close signal during cleanup"
1791 );
1792 }
1793}