use std::{
collections::{HashMap, HashSet},
fmt::{Display, Formatter},
time::Duration,
};
use chrono::{Duration as ChronoDuration, Local, NaiveDateTime};
use futures03::{
future::{join_all, try_join_all},
stream::FuturesUnordered,
StreamExt,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{
sync::{
mpsc::{self, Receiver},
oneshot,
},
task::JoinHandle,
time::timeout,
};
use tracing::{debug, error, info, trace, warn};
use tycho_common::{
display::opt,
dto::{BlockChanges, ExtractorIdentity},
Bytes,
};
use crate::feed::{
block_history::{BlockHistory, BlockHistoryError, BlockPosition},
synchronizer::{StateSyncMessage, StateSynchronizer, SyncResult, SynchronizerError},
};
mod block_history;
pub mod component_tracker;
pub mod synchronizer;
pub trait HeaderLike {
fn block(self) -> Option<BlockHeader>;
fn block_number_or_timestamp(self) -> u64;
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, Eq, Hash)]
pub struct BlockHeader {
pub hash: Bytes,
pub number: u64,
pub parent_hash: Bytes,
pub revert: bool,
pub timestamp: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub partial_block_index: Option<u32>,
}
impl BlockHeader {
fn is_partial(&self) -> bool {
self.partial_block_index.is_some()
}
}
impl Display for BlockHeader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let short_hash = if self.hash.len() >= 4 {
hex::encode(&self.hash[..4]) } else {
hex::encode(&self.hash)
};
match self.partial_block_index {
Some(idx) => write!(f, "Block #{} [0x{}..] (partial {})", self.number, short_hash, idx),
None => write!(f, "Block #{} [0x{}..]", self.number, short_hash),
}
}
}
impl From<&BlockChanges> for BlockHeader {
fn from(block_changes: &BlockChanges) -> Self {
let block = &block_changes.block;
Self {
hash: block.hash.clone(),
number: block.number,
parent_hash: block.parent_hash.clone(),
revert: block_changes.revert,
timestamp: block.ts.and_utc().timestamp() as u64,
partial_block_index: block_changes.partial_block_index,
}
}
}
impl HeaderLike for BlockHeader {
fn block(self) -> Option<BlockHeader> {
Some(self)
}
fn block_number_or_timestamp(self) -> u64 {
self.number
}
}
#[derive(Error, Debug)]
pub enum BlockSynchronizerError {
#[error("Failed to initialize synchronizer: {0}")]
InitializationError(#[from] SynchronizerError),
#[error("Failed to process new block: {0}")]
BlockHistoryError(#[from] BlockHistoryError),
#[error("Not a single synchronizer was ready: {0}")]
NoReadySynchronizers(String),
#[error("No synchronizers were set")]
NoSynchronizers,
#[error("Failed to convert duration: {0}")]
DurationConversionError(String),
}
type BlockSyncResult<T> = Result<T, BlockSynchronizerError>;
pub struct BlockSynchronizer<S> {
synchronizers: Option<HashMap<ExtractorIdentity, S>>,
block_time: std::time::Duration,
latency_buffer: std::time::Duration,
startup_timeout: std::time::Duration,
max_messages: Option<usize>,
max_missed_blocks: u64,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "lowercase")]
pub enum SynchronizerState {
Started,
Ready(BlockHeader),
Delayed(BlockHeader),
Stale(BlockHeader),
Advanced(BlockHeader),
Ended(String),
}
impl Display for SynchronizerState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SynchronizerState::Started => write!(f, "Started"),
SynchronizerState::Ready(b) => write!(f, "Started({})", b.number),
SynchronizerState::Delayed(b) => write!(f, "Delayed({})", b.number),
SynchronizerState::Stale(b) => write!(f, "Stale({})", b.number),
SynchronizerState::Advanced(b) => write!(f, "Advanced({})", b.number),
SynchronizerState::Ended(reason) => write!(f, "Ended({})", reason),
}
}
}
pub struct SynchronizerStream {
extractor_id: ExtractorIdentity,
state: SynchronizerState,
error: Option<SynchronizerError>,
modify_ts: NaiveDateTime,
rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
}
impl SynchronizerStream {
fn new(
extractor_id: &ExtractorIdentity,
rx: Receiver<SyncResult<StateSyncMessage<BlockHeader>>>,
) -> Self {
Self {
extractor_id: extractor_id.clone(),
state: SynchronizerState::Started,
error: None,
modify_ts: Local::now().naive_utc(),
rx,
}
}
async fn try_advance(
&mut self,
block_history: &BlockHistory,
block_time: std::time::Duration,
latency_buffer: std::time::Duration,
stale_threshold: std::time::Duration,
) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
let extractor_id = self.extractor_id.clone();
let latest_block = block_history.latest();
match &self.state {
SynchronizerState::Started | SynchronizerState::Ended(_) => {
warn!(state=?&self.state, "Advancing Synchronizer in this state not supported!");
Ok(None)
}
SynchronizerState::Advanced(b) => {
let future_block = b.clone();
self.transition(future_block, block_history, stale_threshold)?;
Ok(None)
}
SynchronizerState::Ready(previous_block) => {
self.try_recv_next_expected(
block_time + latency_buffer,
block_history,
previous_block.clone(),
stale_threshold,
)
.await
}
SynchronizerState::Delayed(old_block) => {
debug!(
%old_block,
latest_block=opt(&latest_block),
%extractor_id,
"Trying to catch up to latest block"
);
self.try_catch_up(block_history, block_time + latency_buffer, stale_threshold)
.await
}
SynchronizerState::Stale(old_block) => {
debug!(
%old_block,
latest_block=opt(&latest_block),
%extractor_id,
"Trying to catch up stale synchronizer to latest block"
);
self.try_catch_up(block_history, block_time, stale_threshold)
.await
}
}
}
async fn try_recv_next_expected(
&mut self,
max_wait: std::time::Duration,
block_history: &BlockHistory,
previous_block: BlockHeader,
stale_threshold: std::time::Duration,
) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
let extractor_id = self.extractor_id.clone();
match timeout(max_wait, self.rx.recv()).await {
Ok(Some(Ok(msg))) => {
self.transition(msg.header.clone(), block_history, stale_threshold)?;
Ok(Some(msg))
}
Ok(Some(Err(e))) => {
self.mark_errored(e);
Ok(None)
}
Ok(None) => {
warn!(
%extractor_id,
"Tried to poll from closed synchronizer.",
);
self.mark_closed();
Ok(None)
}
Err(_) => {
debug!(%extractor_id, %previous_block, "No block received within time limit.");
self.state = SynchronizerState::Delayed(previous_block.clone());
self.modify_ts = Local::now().naive_utc();
Ok(None)
}
}
}
async fn try_catch_up(
&mut self,
block_history: &BlockHistory,
max_wait: std::time::Duration,
stale_threshold: std::time::Duration,
) -> BlockSyncResult<Option<StateSyncMessage<BlockHeader>>> {
let mut results = Vec::new();
let extractor_id = self.extractor_id.clone();
let deadline = std::time::Instant::now() + max_wait;
while std::time::Instant::now() < deadline {
match timeout(
deadline.saturating_duration_since(std::time::Instant::now()),
self.rx.recv(),
)
.await
{
Ok(Some(Ok(msg))) => {
debug!(%extractor_id, block=%msg.header, "Received new message during catch-up");
let block_pos = block_history.determine_block_position(&msg.header)?;
results.push(msg);
if matches!(block_pos, BlockPosition::NextExpected | BlockPosition::NextPartial)
{
break;
}
}
Ok(Some(Err(e))) => {
self.mark_errored(e);
return Ok(None);
}
Ok(None) => {
warn!(
%extractor_id,
"Tried to poll from closed synchronizer during catch up.",
);
self.mark_closed();
return Ok(None);
}
Err(_) => {
debug!(%extractor_id, "Timed out waiting for catch-up");
break;
}
}
}
let merged = results
.into_iter()
.reduce(|l, r| l.merge(r));
if let Some(msg) = merged {
debug!(%extractor_id, "Delayed extractor made progress!");
self.transition(msg.header.clone(), block_history, stale_threshold)?;
Ok(Some(msg))
} else {
self.check_and_transition_to_stale_if_needed(stale_threshold, None)?;
Ok(None)
}
}
fn check_and_transition_to_stale_if_needed(
&mut self,
stale_threshold: std::time::Duration,
fallback_header: Option<BlockHeader>,
) -> Result<bool, BlockSynchronizerError> {
let now = Local::now().naive_utc();
let wait_duration = now.signed_duration_since(self.modify_ts);
let stale_threshold_chrono = ChronoDuration::from_std(stale_threshold)
.map_err(|e| BlockSynchronizerError::DurationConversionError(e.to_string()))?;
if wait_duration > stale_threshold_chrono {
let header_to_use = match (&self.state, fallback_header) {
(SynchronizerState::Ready(h), _) |
(SynchronizerState::Delayed(h), _) |
(SynchronizerState::Stale(h), _) => h.clone(),
(_, Some(h)) => h,
_ => BlockHeader::default(),
};
warn!(
extractor_id=%self.extractor_id,
last_message_at=?self.modify_ts,
"SynchronizerStream transition to stale due to timeout."
);
self.state = SynchronizerState::Stale(header_to_use);
self.modify_ts = now;
Ok(true)
} else {
Ok(false)
}
}
fn transition(
&mut self,
latest_retrieved: BlockHeader,
block_history: &BlockHistory,
stale_threshold: std::time::Duration,
) -> Result<(), BlockSynchronizerError> {
let extractor_id = self.extractor_id.clone();
let last_message_at = self.modify_ts;
let block = &latest_retrieved;
match block_history.determine_block_position(&latest_retrieved)? {
BlockPosition::NextExpected | BlockPosition::NextPartial => {
self.state = SynchronizerState::Ready(latest_retrieved.clone());
trace!(
next = %latest_retrieved,
extractor = %extractor_id,
"SynchronizerStream transition to next expected"
)
}
BlockPosition::Latest | BlockPosition::Delayed => {
if !self.check_and_transition_to_stale_if_needed(
stale_threshold,
Some(latest_retrieved.clone()),
)? {
warn!(
%extractor_id,
?last_message_at,
%block,
"SynchronizerStream transition transition to delayed."
);
self.state = SynchronizerState::Delayed(latest_retrieved.clone());
}
}
BlockPosition::Advanced => {
info!(
%extractor_id,
?last_message_at,
latest = opt(&block_history.latest()),
%block,
"SynchronizerStream transition to advanced."
);
self.state = SynchronizerState::Advanced(latest_retrieved.clone());
}
}
self.modify_ts = Local::now().naive_utc();
Ok(())
}
fn mark_errored(&mut self, error: SynchronizerError) {
self.state = SynchronizerState::Ended(error.to_string());
self.modify_ts = Local::now().naive_utc();
self.error = Some(error);
}
fn mark_closed(&mut self) {
if !matches!(self.state, SynchronizerState::Ended(_)) {
self.state = SynchronizerState::Ended("Closed".to_string());
self.modify_ts = Local::now().naive_utc();
}
}
fn mark_stale(&mut self, header: &BlockHeader) {
self.state = SynchronizerState::Stale(header.clone());
self.modify_ts = Local::now().naive_utc();
}
fn mark_ready(&mut self, header: &BlockHeader) {
self.state = SynchronizerState::Ready(header.clone());
self.modify_ts = Local::now().naive_utc();
}
fn has_ended(&self) -> bool {
matches!(self.state, SynchronizerState::Ended(_))
}
fn is_stale(&self) -> bool {
matches!(self.state, SynchronizerState::Stale(_))
}
fn is_advanced(&self) -> bool {
matches!(self.state, SynchronizerState::Advanced(_))
}
fn get_current_header(&self) -> Option<&BlockHeader> {
match &self.state {
SynchronizerState::Ready(b) |
SynchronizerState::Delayed(b) |
SynchronizerState::Advanced(b) => Some(b),
_ => None,
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct FeedMessage<H = BlockHeader>
where
H: HeaderLike,
{
pub state_msgs: HashMap<String, StateSyncMessage<H>>,
pub sync_states: HashMap<String, SynchronizerState>,
}
impl<H> FeedMessage<H>
where
H: HeaderLike,
{
fn new(
state_msgs: HashMap<String, StateSyncMessage<H>>,
sync_states: HashMap<String, SynchronizerState>,
) -> Self {
Self { state_msgs, sync_states }
}
}
impl<S> BlockSynchronizer<S>
where
S: StateSynchronizer,
{
pub fn new(
block_time: std::time::Duration,
latency_buffer: std::time::Duration,
max_missed_blocks: u64,
) -> Self {
Self {
synchronizers: None,
max_messages: None,
block_time,
latency_buffer,
startup_timeout: block_time.mul_f64(max_missed_blocks as f64),
max_missed_blocks,
}
}
pub fn max_messages(&mut self, val: usize) {
self.max_messages = Some(val);
}
pub fn startup_timeout(mut self, val: Duration) {
self.startup_timeout = val;
}
pub fn register_synchronizer(mut self, id: ExtractorIdentity, synchronizer: S) -> Self {
let mut registered = self.synchronizers.unwrap_or_default();
registered.insert(id, synchronizer);
self.synchronizers = Some(registered);
self
}
#[cfg(test)]
pub fn with_short_timeouts() -> Self {
Self::new(Duration::from_millis(10), Duration::from_millis(10), 3)
}
async fn cleanup_synchronizers(
mut state_sync_tasks: FuturesUnordered<JoinHandle<()>>,
sync_close_senders: Vec<oneshot::Sender<()>>,
) {
for close_sender in sync_close_senders {
let _ = close_sender.send(());
}
let mut completed_tasks = 0;
while let Ok(Some(_)) = timeout(Duration::from_secs(5), state_sync_tasks.next()).await {
completed_tasks += 1;
}
let remaining_tasks = state_sync_tasks.len();
if remaining_tasks > 0 {
warn!(
completed = completed_tasks,
timed_out = remaining_tasks,
"Some synchronizers timed out during cleanup and may not have shut down cleanly"
);
}
}
pub async fn run(
mut self,
) -> BlockSyncResult<(JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage<BlockHeader>>>)>
{
trace!("Starting BlockSynchronizer...");
let state_sync_tasks = FuturesUnordered::new();
let mut synchronizers = self
.synchronizers
.take()
.ok_or(BlockSynchronizerError::NoSynchronizers)?;
let init_tasks = synchronizers
.values_mut()
.map(|s| s.initialize())
.collect::<Vec<_>>();
try_join_all(init_tasks).await?;
let mut sync_streams = Vec::with_capacity(synchronizers.len());
let mut sync_close_senders = Vec::new();
for (extractor_id, synchronizer) in synchronizers.drain() {
let (handle, rx) = synchronizer.start().await;
let (join_handle, close_sender) = handle.split();
state_sync_tasks.push(join_handle);
sync_close_senders.push(close_sender);
sync_streams.push(SynchronizerStream::new(&extractor_id, rx));
}
debug!("Waiting for initial synchronizer messages...");
let mut startup_futures = Vec::new();
for synchronizer in sync_streams.iter_mut() {
let fut = async {
let res = timeout(self.startup_timeout, synchronizer.rx.recv()).await;
(synchronizer, res)
};
startup_futures.push(fut);
}
let mut ready_sync_msgs = HashMap::new();
let initial_headers = join_all(startup_futures)
.await
.into_iter()
.filter_map(|(synchronizer, res)| {
let extractor_id = synchronizer.extractor_id.clone();
match res {
Ok(Some(Ok(msg))) => {
debug!(%extractor_id, height=?&msg.header.number, "Synchronizer started successfully!");
synchronizer.mark_ready(&msg.header);
let header = msg.header.clone();
ready_sync_msgs.insert(extractor_id.name.clone(), msg);
Some(header)
}
Ok(Some(Err(e))) => {
synchronizer.mark_errored(e);
None
}
Ok(None) => {
warn!(%extractor_id, "Synchronizer closed during startup");
synchronizer.mark_closed();
None
}
Err(_) => {
warn!(%extractor_id, "Timed out waiting for first message");
synchronizer.mark_stale(&BlockHeader::default());
None
}
}
})
.collect::<HashSet<_>>() .into_iter()
.collect::<Vec<_>>();
Self::check_streams(&sync_streams)?;
let mut block_history = BlockHistory::new(initial_headers, 15)?;
let start_header = block_history
.latest()
.ok_or(BlockHistoryError::EmptyHistory)?;
info!(
start_block=%start_header,
n_healthy=ready_sync_msgs.len(),
n_total=sync_streams.len(),
"Block synchronization started successfully!"
);
for stream in sync_streams.iter_mut() {
if let SynchronizerState::Ready(header) = stream.state.clone() {
if header.number < start_header.number {
debug!(
extractor_id=%stream.extractor_id,
synchronizer_block=header.number,
current_block=start_header.number,
"Marking synchronizer as delayed during initialization"
);
stream.state = SynchronizerState::Delayed(header);
}
}
}
let (sync_tx, sync_rx) = mpsc::channel(30);
let main_loop_jh = tokio::spawn(async move {
let mut n_iter = 1;
loop {
let msg = FeedMessage::new(
std::mem::take(&mut ready_sync_msgs),
sync_streams
.iter()
.map(|stream| (stream.extractor_id.name.to_string(), stream.state.clone()))
.collect(),
);
if sync_tx.send(Ok(msg)).await.is_err() {
info!("Receiver closed, block synchronizer terminating..");
return;
};
if let Some(max_messages) = self.max_messages {
if n_iter >= max_messages {
info!(max_messages, "StreamEnd");
return;
}
}
n_iter += 1;
let res = self
.handle_next_message(
&mut sync_streams,
&mut ready_sync_msgs,
&mut block_history,
)
.await;
if let Err(e) = res {
let _ = sync_tx.send(Err(e)).await;
return;
}
}
});
let nanny_jh = tokio::spawn(async move {
let _ = main_loop_jh.await.map_err(|e| {
if e.is_panic() {
error!("BlockSynchornizer main loop panicked: {e}")
}
});
debug!("Main loop exited. Closing synchronizers");
Self::cleanup_synchronizers(state_sync_tasks, sync_close_senders).await;
debug!("Shutdown complete");
});
Ok((nanny_jh, sync_rx))
}
async fn handle_next_message(
&self,
sync_streams: &mut [SynchronizerStream],
ready_sync_msgs: &mut HashMap<String, StateSyncMessage<BlockHeader>>,
block_history: &mut BlockHistory,
) -> BlockSyncResult<()> {
let mut recv_futures = Vec::new();
for stream in sync_streams.iter_mut() {
if stream.has_ended() {
continue;
}
recv_futures.push(async {
let res = stream
.try_advance(
block_history,
self.block_time,
self.latency_buffer,
self.block_time
.mul_f64(self.max_missed_blocks as f64),
)
.await?;
Ok::<_, BlockSynchronizerError>(
res.map(|msg| (stream.extractor_id.name.clone(), msg)),
)
});
}
ready_sync_msgs.extend(
join_all(recv_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten(),
);
Self::check_streams(sync_streams)?;
if sync_streams
.iter()
.any(SynchronizerStream::is_advanced)
{
*block_history = Self::reinit_block_history(sync_streams, block_history)?;
} else {
let header = sync_streams
.iter()
.filter_map(SynchronizerStream::get_current_header)
.max_by_key(|b| b.number)
.ok_or(BlockSynchronizerError::NoReadySynchronizers(
"Expected to have at least one synchronizer that is not stale or ended"
.to_string(),
))?;
block_history.push(header.clone())?;
}
Ok(())
}
fn reinit_block_history(
sync_streams: &mut [SynchronizerStream],
block_history: &mut BlockHistory,
) -> Result<BlockHistory, BlockSynchronizerError> {
let previous = block_history
.latest()
.ok_or(BlockHistoryError::EmptyHistory)?;
let blocks = sync_streams
.iter()
.filter_map(SynchronizerStream::get_current_header)
.cloned()
.collect();
let new_block_history = BlockHistory::new(blocks, 10)?;
let latest = new_block_history
.latest()
.ok_or(BlockHistoryError::EmptyHistory)?;
info!(
%previous,
%latest,
"Advanced synchronizer detected. Reinitialized block history."
);
sync_streams
.iter_mut()
.for_each(|stream| {
if let Some(header) = stream.get_current_header() {
if header.number < latest.number {
stream.state = SynchronizerState::Delayed(header.clone());
} else if header.number == latest.number {
stream.state = SynchronizerState::Ready(header.clone());
}
}
});
Ok(new_block_history)
}
fn check_streams(sync_streams: &[SynchronizerStream]) -> BlockSyncResult<()> {
let mut latest_errored_stream: Option<&SynchronizerStream> = None;
for stream in sync_streams.iter() {
if !stream.has_ended() && !stream.is_stale() {
return Ok(());
}
if latest_errored_stream.is_none() ||
stream.modify_ts >
latest_errored_stream
.as_ref()
.unwrap()
.modify_ts
{
latest_errored_stream = Some(stream);
}
}
let last_error_reason = if let Some(stream) = latest_errored_stream {
if let Some(err) = &stream.error {
format!("Synchronizer for {} errored with: {err}", stream.extractor_id)
} else {
format!("Synchronizer for {} became: {}", stream.extractor_id, stream.state)
}
} else {
return Err(BlockSynchronizerError::NoSynchronizers);
};
let mut reason = vec![last_error_reason];
sync_streams.iter().for_each(|stream| {
reason.push(format!(
"{} reported as {} at {}",
stream.extractor_id, stream.state, stream.modify_ts
))
});
Err(BlockSynchronizerError::NoReadySynchronizers(reason.join(", ")))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use async_trait::async_trait;
use test_log::test;
use tokio::sync::{oneshot, Mutex};
use tycho_common::dto::Chain;
use super::*;
use crate::feed::synchronizer::{SyncResult, SynchronizerTaskHandle};
#[derive(Clone, Debug)]
enum MockBehavior {
Normal, IgnoreClose, ExitImmediately, }
type HeaderReceiver = Receiver<SyncResult<StateSyncMessage<BlockHeader>>>;
#[derive(Clone)]
struct MockStateSync {
header_tx: mpsc::Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
header_rx: Arc<Mutex<Option<HeaderReceiver>>>,
close_received: Arc<Mutex<bool>>,
behavior: MockBehavior,
close_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
impl MockStateSync {
fn new() -> Self {
Self::with_behavior(MockBehavior::Normal)
}
fn with_behavior(behavior: MockBehavior) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
header_tx: tx,
header_rx: Arc::new(Mutex::new(Some(rx))),
close_received: Arc::new(Mutex::new(false)),
behavior,
close_tx: Arc::new(Mutex::new(None)),
}
}
async fn was_close_received(&self) -> bool {
*self.close_received.lock().await
}
async fn send_header(&self, header: StateSyncMessage<BlockHeader>) -> Result<(), String> {
self.header_tx
.send(Ok(header))
.await
.map_err(|e| format!("sending header failed: {e}"))
}
async fn trigger_close(&self) {
if let Some(close_tx) = self.close_tx.lock().await.take() {
let _ = close_tx.send(());
}
}
}
#[async_trait]
impl StateSynchronizer for MockStateSync {
async fn initialize(&mut self) -> SyncResult<()> {
Ok(())
}
async fn start(
mut self,
) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
let block_rx = {
let mut guard = self.header_rx.lock().await;
guard
.take()
.expect("Block receiver was not set!")
};
let (close_tx_for_handle, close_rx) = oneshot::channel();
let (close_tx_for_test, close_rx_for_test) = oneshot::channel();
{
let mut guard = self.close_tx.lock().await;
*guard = Some(close_tx_for_test);
}
let behavior = self.behavior.clone();
let close_received_clone = self.close_received.clone();
let tx = self.header_tx.clone();
let jh = tokio::spawn(async move {
match behavior {
MockBehavior::IgnoreClose => {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
MockBehavior::ExitImmediately => {
tx.send(SyncResult::Err(SynchronizerError::ConnectionError(
"Simulated immediate task failure".to_string(),
)))
.await
.unwrap();
}
MockBehavior::Normal => {
let _ = tokio::select! {
result = close_rx => result,
result = close_rx_for_test => result,
};
let mut guard = close_received_clone.lock().await;
*guard = true;
}
}
});
let handle = SynchronizerTaskHandle::new(jh, close_tx_for_handle);
(handle, block_rx)
}
}
fn header_message(block: u8) -> StateSyncMessage<BlockHeader> {
StateSyncMessage {
header: BlockHeader {
number: block as u64,
hash: Bytes::from(vec![block]),
parent_hash: Bytes::from(vec![block - 1]),
revert: false,
timestamp: 1000,
partial_block_index: None,
},
..Default::default()
}
}
fn partial_header_message(block: u8, partial_idx: u32) -> StateSyncMessage<BlockHeader> {
let hash_bytes =
[(block as u64).to_be_bytes().as_slice(), partial_idx.to_be_bytes().as_slice()]
.concat();
StateSyncMessage {
header: BlockHeader {
number: block as u64,
hash: Bytes::from(hash_bytes),
parent_hash: Bytes::from(vec![block - 1]),
revert: false,
timestamp: 1000,
partial_block_index: Some(partial_idx),
},
..Default::default()
}
}
fn revert_header_message(block: u8) -> StateSyncMessage<BlockHeader> {
StateSyncMessage {
header: BlockHeader {
number: block as u64,
hash: Bytes::from(vec![block]),
parent_hash: Bytes::from(vec![block - 1]),
revert: true,
timestamp: 1000,
partial_block_index: None,
},
..Default::default()
}
}
async fn receive_message(rx: &mut Receiver<BlockSyncResult<FeedMessage>>) -> FeedMessage {
timeout(Duration::from_millis(100), rx.recv())
.await
.expect("Responds in time")
.expect("Should receive first message")
.expect("No error")
}
async fn setup_block_sync(
) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
{
setup_block_sync_with_behaviour(MockBehavior::Normal, MockBehavior::Normal).await
}
async fn setup_block_sync_with_behaviour(
v2_behavior: MockBehavior,
v3_behavior: MockBehavior,
) -> (MockStateSync, MockStateSync, JoinHandle<()>, Receiver<BlockSyncResult<FeedMessage>>)
{
let v2_sync = MockStateSync::with_behavior(v2_behavior);
let v3_sync = MockStateSync::with_behavior(v3_behavior);
let mut block_sync = BlockSynchronizer::new(
Duration::from_millis(20), Duration::from_millis(10), 3, );
block_sync.max_messages(10);
let block_sync = block_sync
.register_synchronizer(
ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
v2_sync.clone(),
)
.register_synchronizer(
ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
v3_sync.clone(),
);
let block1_msg = header_message(1);
let _ = v2_sync
.send_header(block1_msg.clone())
.await;
let _ = v3_sync
.send_header(block1_msg.clone())
.await;
let (nanny_handle, mut rx) = block_sync
.run()
.await
.expect("BlockSynchronizer failed to start");
let first_feed_msg = receive_message(&mut rx).await;
assert_eq!(first_feed_msg.state_msgs.len(), 2);
assert!(matches!(
first_feed_msg
.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ready(_)
));
assert!(matches!(
first_feed_msg
.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Ready(_)
));
(v2_sync, v3_sync, nanny_handle, rx)
}
async fn shutdown_block_synchronizer(
v2_sync: &MockStateSync,
v3_sync: &MockStateSync,
nanny_handle: JoinHandle<()>,
) {
v3_sync.trigger_close().await;
v2_sync.trigger_close().await;
timeout(Duration::from_millis(100), nanny_handle)
.await
.expect("Nanny failed to exit within time")
.expect("Nanny panicked");
}
async fn send_and_assert_ready(
sync: &MockStateSync,
sync_name: &str,
rx: &mut Receiver<BlockSyncResult<FeedMessage>>,
msg: StateSyncMessage<BlockHeader>,
expected_block: u64,
expected_partial: Option<u32>,
) {
sync.send_header(msg)
.await
.expect("send failed");
let feed_msg = receive_message(rx).await;
let state = feed_msg
.sync_states
.get(sync_name)
.unwrap();
match state {
SynchronizerState::Ready(h) => {
assert_eq!(h.number, expected_block, "wrong block number");
assert_eq!(h.partial_block_index, expected_partial, "wrong partial index");
}
other => panic!("expected Ready, got {:?}", other),
}
}
#[test(tokio::test)]
async fn test_two_ready_synchronizers() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let second_msg = header_message(2);
v2_sync
.send_header(second_msg.clone())
.await
.expect("send_header failed");
v3_sync
.send_header(second_msg.clone())
.await
.expect("send_header failed");
let second_feed_msg = receive_message(&mut rx).await;
let exp2 = FeedMessage {
state_msgs: [
("uniswap-v2".to_string(), second_msg.clone()),
("uniswap-v3".to_string(), second_msg.clone()),
]
.into_iter()
.collect(),
sync_states: [
("uniswap-v3".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
("uniswap-v2".to_string(), SynchronizerState::Ready(second_msg.header.clone())),
]
.into_iter()
.collect(),
};
assert_eq!(second_feed_msg, exp2);
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_delayed_synchronizer_catches_up() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let block2_msg = header_message(2);
v2_sync
.send_header(block2_msg.clone())
.await
.expect("send_header failed");
let second_feed_msg = receive_message(&mut rx).await;
debug!("Consumed second message for v2");
assert!(second_feed_msg
.state_msgs
.contains_key("uniswap-v2"));
assert!(matches!(
second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
SynchronizerState::Ready(header) if header.number == 2
));
assert!(!second_feed_msg
.state_msgs
.contains_key("uniswap-v3"));
assert!(matches!(
second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
SynchronizerState::Delayed(header) if header.number == 1
));
v3_sync
.send_header(block2_msg.clone())
.await
.expect("send_header failed");
let block3_msg = header_message(3);
v2_sync
.send_header(block3_msg.clone())
.await
.expect("send_header failed");
v3_sync
.send_header(block3_msg)
.await
.expect("send_header failed");
let mut third_feed_msg = receive_message(&mut rx).await;
if !third_feed_msg
.state_msgs
.contains_key("uniswap-v2")
{
third_feed_msg = rx
.recv()
.await
.expect("header channel was closed")
.expect("no error");
}
assert!(third_feed_msg
.state_msgs
.contains_key("uniswap-v2"));
assert!(third_feed_msg
.state_msgs
.contains_key("uniswap-v3"));
assert!(matches!(
third_feed_msg.sync_states.get("uniswap-v2").unwrap(),
SynchronizerState::Ready(header) if header.number == 3
));
assert!(matches!(
third_feed_msg.sync_states.get("uniswap-v3").unwrap(),
SynchronizerState::Ready(header) if header.number == 3
));
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_different_start_blocks() {
let v2_sync = MockStateSync::new();
let v3_sync = MockStateSync::new();
let block_sync = BlockSynchronizer::with_short_timeouts()
.register_synchronizer(
ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v2".to_string() },
v2_sync.clone(),
)
.register_synchronizer(
ExtractorIdentity { chain: Chain::Ethereum, name: "uniswap-v3".to_string() },
v3_sync.clone(),
);
let block1_msg = header_message(1);
let block2_msg = header_message(2);
let _ = v2_sync
.send_header(block1_msg.clone())
.await;
v3_sync
.send_header(block2_msg.clone())
.await
.expect("send_header failed");
let (jh, mut rx) = block_sync
.run()
.await
.expect("BlockSynchronizer failed to start.");
let first_feed_msg = receive_message(&mut rx).await;
assert!(matches!(
first_feed_msg.sync_states.get("uniswap-v2").unwrap(),
SynchronizerState::Delayed(header) if header.number == 1
));
assert!(matches!(
first_feed_msg.sync_states.get("uniswap-v3").unwrap(),
SynchronizerState::Ready(header) if header.number == 2
));
v2_sync
.send_header(block2_msg.clone())
.await
.expect("send_header failed");
let block3_msg = header_message(3);
let _ = v2_sync
.send_header(block3_msg.clone())
.await;
v3_sync
.send_header(block3_msg.clone())
.await
.expect("send_header failed");
let second_feed_msg = receive_message(&mut rx).await;
assert_eq!(second_feed_msg.state_msgs.len(), 2);
assert!(matches!(
second_feed_msg.sync_states.get("uniswap-v2").unwrap(),
SynchronizerState::Ready(header) if header.number == 3
));
assert!(matches!(
second_feed_msg.sync_states.get("uniswap-v3").unwrap(),
SynchronizerState::Ready(header) if header.number == 3
));
shutdown_block_synchronizer(&v2_sync, &v3_sync, jh).await;
}
#[test(tokio::test)]
async fn test_synchronizer_fails_other_goes_stale() {
let (_v2_sync, v3_sync, nanny_handle, mut sync_rx) =
setup_block_sync_with_behaviour(MockBehavior::ExitImmediately, MockBehavior::Normal)
.await;
let mut error_reported = false;
for _ in 0..3 {
if let Some(msg) = sync_rx.recv().await {
match msg {
Err(_) => error_reported = true,
Ok(msg) => {
assert!(matches!(
msg.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Delayed(_)
));
assert!(matches!(
msg.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ended(_)
));
}
}
}
}
assert!(error_reported, "BlockSynchronizer did not report final error");
let result = timeout(Duration::from_secs(2), nanny_handle).await;
assert!(result.is_ok(), "Nanny should complete when synchronizer task exits");
assert!(
v3_sync.was_close_received().await,
"v3_sync should have received close signal during cleanup"
);
}
#[test(tokio::test)]
async fn test_cleanup_timeout_warning() {
let (_v2_sync, _v3_sync, nanny_handle, _rx) = setup_block_sync_with_behaviour(
MockBehavior::ExitImmediately,
MockBehavior::IgnoreClose,
)
.await;
let result = timeout(Duration::from_secs(10), nanny_handle).await;
assert!(
result.is_ok(),
"Nanny should complete even when some synchronizers timeout during cleanup"
);
}
#[test(tokio::test)]
async fn test_one_synchronizer_goes_stale_while_other_works() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let block2_msg = header_message(2);
let _ = v3_sync
.send_header(block2_msg.clone())
.await;
let second_feed_msg = receive_message(&mut rx).await;
assert!(second_feed_msg
.state_msgs
.contains_key("uniswap-v3"));
assert!(!second_feed_msg
.state_msgs
.contains_key("uniswap-v2"));
assert!(matches!(
second_feed_msg
.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Ready(_)
));
if let Some(v2_state) = second_feed_msg
.sync_states
.get("uniswap-v2")
{
if matches!(v2_state, SynchronizerState::Delayed(_)) {
assert!(
!nanny_handle.is_finished(),
"Nanny should still be running when synchronizer is delayed (not stale yet)"
);
}
}
tokio::time::sleep(Duration::from_millis(15)).await;
let block3_msg = header_message(3);
let _ = v3_sync
.send_header(block3_msg.clone())
.await;
tokio::time::sleep(Duration::from_millis(40)).await;
let mut stale_found = false;
for _ in 0..2 {
if let Some(Ok(msg)) = rx.recv().await {
if let Some(SynchronizerState::Stale(_)) = msg.sync_states.get("uniswap-v2") {
stale_found = true;
}
}
}
assert!(stale_found, "v2 synchronizer should be stale");
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_all_synchronizers_go_stale_main_loop_exits() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let mut seen_delayed = false;
let timeout_duration = Duration::from_millis(500); let start_time = tokio::time::Instant::now();
while let Ok(Some(Ok(msg))) =
tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
{
if !seen_delayed {
let v2_state = msg.sync_states.get("uniswap-v2");
let v3_state = msg.sync_states.get("uniswap-v3");
if matches!(v2_state, Some(SynchronizerState::Delayed(_))) ||
matches!(v3_state, Some(SynchronizerState::Delayed(_)))
{
seen_delayed = true;
assert!(!nanny_handle.is_finished(),
"Nanny should still be running when synchronizers are delayed (not stale yet)");
break;
}
}
if start_time.elapsed() > timeout_duration {
break;
}
}
assert!(seen_delayed, "Synchronizers should transition to Delayed state first");
let mut error_reported = false;
while let Some(msg) = rx.recv().await {
if let Err(e) = msg {
assert!(e
.to_string()
.contains("became: Stale(1)"));
assert!(e
.to_string()
.contains("reported as Stale(1)"));
error_reported = true;
}
}
assert!(error_reported, "Expected the channel to report an error before closing");
let nanny_result = timeout(Duration::from_secs(2), nanny_handle).await;
assert!(nanny_result.is_ok(), "Nanny should complete when main loop exits");
assert!(
v2_sync.was_close_received().await,
"v2_sync should have received close signal during cleanup"
);
assert!(
v3_sync.was_close_received().await,
"v3_sync should have received close signal during cleanup"
);
}
#[test(tokio::test)]
async fn test_stale_synchronizer_recovers() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
tokio::time::sleep(Duration::from_millis(50)).await;
let block2_msg = header_message(2);
let _ = v2_sync
.send_header(block2_msg.clone())
.await;
for _ in 0..2 {
if let Some(msg) = rx.recv().await {
if let Ok(msg) = msg {
if matches!(
msg.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ready(_)
) {
assert!(matches!(
msg.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Delayed(_)
));
break;
};
}
} else {
panic!("Channel closed unexpectedly")
}
}
tokio::time::sleep(Duration::from_millis(15)).await;
let block3_msg = header_message(3);
let _ = v2_sync
.send_header(block3_msg.clone())
.await;
let third_msg = receive_message(&mut rx).await;
dbg!(&third_msg);
assert!(matches!(
third_msg
.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ready(_)
));
assert!(matches!(
third_msg
.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Stale(_)
));
let block4_msg = header_message(4);
let _ = v3_sync
.send_header(block2_msg.clone())
.await;
let _ = v3_sync
.send_header(block3_msg.clone())
.await;
let _ = v3_sync
.send_header(block4_msg.clone())
.await;
let _ = v2_sync
.send_header(block4_msg.clone())
.await;
let fourth_msg = receive_message(&mut rx).await;
assert!(matches!(
fourth_msg
.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ready(_)
));
assert!(matches!(
fourth_msg
.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Ready(_)
));
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
assert!(
v2_sync.was_close_received().await,
"v2_sync should have received close signal during cleanup"
);
assert!(
v3_sync.was_close_received().await,
"v3_sync should have received close signal during cleanup"
);
}
#[test(tokio::test)]
async fn test_all_synchronizer_advanced() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let block3 = header_message(3);
v2_sync
.send_header(block3.clone())
.await
.unwrap();
v3_sync
.send_header(block3)
.await
.unwrap();
let msg = receive_message(&mut rx).await;
matches!(
msg.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ready(_)
);
matches!(
msg.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Ready(_)
);
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_one_synchronizer_advanced() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let block2 = header_message(2);
let block4 = header_message(4);
v2_sync
.send_header(block4.clone())
.await
.unwrap();
v3_sync
.send_header(block2.clone())
.await
.unwrap();
let msg = receive_message(&mut rx).await;
matches!(
msg.sync_states
.get("uniswap-v2")
.unwrap(),
SynchronizerState::Ready(_)
);
matches!(
msg.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Delayed(_)
);
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_partial_blocks_normal_operation() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(2, 0),
2,
Some(0),
)
.await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(2, 3),
2,
Some(3),
)
.await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(2, 7),
2,
Some(7),
)
.await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(3, 0),
3,
Some(0),
)
.await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(3, 2),
3,
Some(2),
)
.await;
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_partial_blocks_handles_reverts() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(2), 2, None).await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(3, 0),
3,
Some(0),
)
.await;
send_and_assert_ready(
&v2_sync,
"uniswap-v2",
&mut rx,
partial_header_message(3, 2),
3,
Some(2),
)
.await;
send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, revert_header_message(2), 2, None)
.await;
send_and_assert_ready(&v2_sync, "uniswap-v2", &mut rx, header_message(3), 3, None).await;
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
#[test(tokio::test)]
async fn test_partial_blocks_delayed_synchronizer_catches_up() {
let (v2_sync, v3_sync, nanny_handle, mut rx) = setup_block_sync().await;
let partial_0 = partial_header_message(2, 0);
v2_sync
.send_header(partial_0.clone())
.await
.expect("send partial 0 failed");
let msg = receive_message(&mut rx).await;
assert!(msg
.state_msgs
.contains_key("uniswap-v2"));
assert!(!msg
.state_msgs
.contains_key("uniswap-v3"));
assert!(matches!(
msg.sync_states.get("uniswap-v2").unwrap(),
SynchronizerState::Ready(h) if h.partial_block_index == Some(0)
));
assert!(matches!(
msg.sync_states
.get("uniswap-v3")
.unwrap(),
SynchronizerState::Delayed(_)
));
let partial_2 = partial_header_message(2, 2);
v2_sync
.send_header(partial_2.clone())
.await
.expect("send partial 2 failed");
v3_sync
.send_header(partial_0.clone())
.await
.expect("v3 catch up partial 0 failed");
v3_sync
.send_header(partial_2.clone())
.await
.expect("v3 catch up partial 2 failed");
let mut v3_ready = false;
for _ in 0..3 {
let msg = receive_message(&mut rx).await;
if matches!(
msg.sync_states.get("uniswap-v3").unwrap(),
SynchronizerState::Ready(h) if h.partial_block_index == Some(2)
) {
v3_ready = true;
break;
}
}
assert!(v3_ready, "v3 caught up to partial 2");
shutdown_block_synchronizer(&v2_sync, &v3_sync, nanny_handle).await;
}
}