use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use celestia_tendermint::Time;
use celestia_types::ExtendedHeader;
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use web_time::Instant;
use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
use crate::events::{EventPublisher, NodeEvent};
use crate::executor::{sleep, spawn, spawn_cancellable, Interval};
use crate::p2p::{P2p, P2pError};
use crate::store::{Store, StoreError};
use crate::utils::OneshotSenderExt;
type Result<T, E = SyncerError> = std::result::Result<T, E>;
const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
pub const SYNCING_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60); #[derive(Debug, thiserror::Error)]
pub enum SyncerError {
#[error("P2p: {0}")]
P2p(#[from] P2pError),
#[error("Store: {0}")]
Store(#[from] StoreError),
#[error("Worker died")]
WorkerDied,
#[error("Channel closed unexpectedly")]
ChannelClosedUnexpectedly,
}
impl SyncerError {
pub(crate) fn is_fatal(&self) -> bool {
match self {
SyncerError::P2p(e) => e.is_fatal(),
SyncerError::Store(e) => e.is_fatal(),
SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
}
}
}
impl From<oneshot::error::RecvError> for SyncerError {
fn from(_value: oneshot::error::RecvError) -> Self {
SyncerError::ChannelClosedUnexpectedly
}
}
#[derive(Debug)]
pub(crate) struct Syncer<S>
where
S: Store + 'static,
{
cmd_tx: mpsc::Sender<SyncerCmd>,
cancellation_token: CancellationToken,
_store: PhantomData<S>,
}
pub(crate) struct SyncerArgs<S>
where
S: Store + 'static,
{
pub(crate) p2p: Arc<P2p>,
pub(crate) store: Arc<S>,
pub(crate) event_pub: EventPublisher,
pub(crate) batch_size: u64,
}
#[derive(Debug)]
enum SyncerCmd {
GetInfo {
respond_to: oneshot::Sender<SyncingInfo>,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncingInfo {
pub stored_headers: BlockRanges,
pub subjective_head: u64,
}
impl<S> Syncer<S>
where
S: Store,
{
pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
let cancellation_token = CancellationToken::new();
let event_pub = args.event_pub.clone();
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
spawn(async move {
if let Err(e) = worker.run().await {
error!("Syncer stopped because of a fatal error: {e}");
event_pub.send(NodeEvent::FatalSyncerError {
error: e.to_string(),
});
}
});
Ok(Syncer {
cancellation_token,
cmd_tx,
_store: PhantomData,
})
}
pub(crate) fn stop(&self) {
self.cancellation_token.cancel();
}
async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
self.cmd_tx
.send(cmd)
.await
.map_err(|_| SyncerError::WorkerDied)
}
pub(crate) async fn info(&self) -> Result<SyncingInfo> {
let (tx, rx) = oneshot::channel();
self.send_command(SyncerCmd::GetInfo { respond_to: tx })
.await?;
Ok(rx.await?)
}
}
impl<S> Drop for Syncer<S>
where
S: Store,
{
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}
struct Worker<S>
where
S: Store + 'static,
{
cancellation_token: CancellationToken,
cmd_rx: mpsc::Receiver<SyncerCmd>,
event_pub: EventPublisher,
p2p: Arc<P2p>,
store: Arc<S>,
header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
subjective_head_height: Option<u64>,
batch_size: u64,
ongoing_batch: Option<Ongoing>,
}
struct Ongoing {
batch: BlockRange,
cancellation_token: CancellationToken,
}
impl<S> Worker<S>
where
S: Store,
{
fn new(
args: SyncerArgs<S>,
cancellation_token: CancellationToken,
cmd_rx: mpsc::Receiver<SyncerCmd>,
) -> Result<Self> {
Ok(Worker {
cancellation_token,
cmd_rx,
event_pub: args.event_pub,
p2p: args.p2p,
store: args.store,
header_sub_rx: None,
subjective_head_height: None,
batch_size: args.batch_size,
ongoing_batch: None,
})
}
async fn run(&mut self) -> Result<()> {
loop {
if self.cancellation_token.is_cancelled() {
break;
}
self.connecting_event_loop().await?;
if self.cancellation_token.is_cancelled() {
break;
}
self.connected_event_loop().await?;
}
debug!("Syncer stopped");
Ok(())
}
async fn connecting_event_loop(&mut self) -> Result<()> {
debug!("Entering connecting_event_loop");
let mut report_interval = Interval::new(Duration::from_secs(60)).await;
let mut try_init_result = self.spawn_try_init().fuse();
self.report().await?;
loop {
select! {
_ = self.cancellation_token.cancelled() => {
break;
}
_ = report_interval.tick() => {
self.report().await?;
}
res = &mut try_init_result => {
let (network_head, took) = res??;
let network_head_height = network_head.height().value();
info!("Setting initial subjective head to {network_head_height}");
self.set_subjective_head_height(network_head_height);
let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
self.p2p.init_header_sub(network_head, header_sub_tx).await?;
self.header_sub_rx = Some(header_sub_rx);
self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
height: network_head_height,
took,
});
break;
}
Some(cmd) = self.cmd_rx.recv() => {
self.on_cmd(cmd).await?;
}
}
}
Ok(())
}
async fn connected_event_loop(&mut self) -> Result<()> {
debug!("Entering connected_event_loop");
let (headers_tx, mut headers_rx) = mpsc::channel(1);
let mut report_interval = Interval::new(Duration::from_secs(60)).await;
let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
warn!("All peers disconnected");
return Ok(());
}
self.fetch_next_batch(&headers_tx).await?;
self.report().await?;
loop {
select! {
_ = self.cancellation_token.cancelled() => {
break;
}
_ = peer_tracker_info_watcher.changed() => {
if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
warn!("All peers disconnected");
break;
}
}
_ = report_interval.tick() => {
self.report().await?;
}
res = header_sub_recv(self.header_sub_rx.as_mut()) => {
let header = res?;
self.on_header_sub_message(header).await?;
self.fetch_next_batch(&headers_tx).await?;
}
Some(cmd) = self.cmd_rx.recv() => {
self.on_cmd(cmd).await?;
}
Some((res, took)) = headers_rx.recv() => {
self.on_fetch_next_batch_result(res, took).await?;
self.fetch_next_batch(&headers_tx).await?;
}
}
}
if let Some(ongoing) = self.ongoing_batch.take() {
warn!("Cancelling fetching of {}", ongoing.batch.display());
ongoing.cancellation_token.cancel();
}
self.header_sub_rx.take();
Ok(())
}
async fn syncing_info(&self) -> Result<SyncingInfo> {
Ok(SyncingInfo {
stored_headers: self.store.get_stored_header_ranges().await?,
subjective_head: self.subjective_head_height.unwrap_or(0),
})
}
#[instrument(skip_all)]
async fn report(&mut self) -> Result<()> {
let SyncingInfo {
stored_headers,
subjective_head,
} = self.syncing_info().await?;
let ongoing_batch = self
.ongoing_batch
.as_ref()
.map(|ongoing| format!("{}", ongoing.batch.display()))
.unwrap_or_else(|| "None".to_string());
info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
Ok(())
}
fn spawn_try_init(&self) -> oneshot::Receiver<Result<(ExtendedHeader, Duration)>> {
let p2p = self.p2p.clone();
let store = self.store.clone();
let event_pub = self.event_pub.clone();
let (tx, rx) = oneshot::channel();
let fut = async move {
let mut event_reported = false;
let now = Instant::now();
let mut backoff = ExponentialBackoffBuilder::default()
.with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
.with_max_elapsed_time(None)
.build();
loop {
match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
Ok(network_head) => {
tx.maybe_send(Ok((network_head, now.elapsed())));
break;
}
Err(e) if e.is_fatal() => {
tx.maybe_send(Err(e));
break;
}
Err(e) => {
let sleep_dur = backoff
.next_backoff()
.expect("backoff never stops retrying");
warn!("Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}.");
sleep(sleep_dur).await;
}
}
}
};
spawn_cancellable(
self.cancellation_token.child_token(),
fut.instrument(info_span!("try_init")),
);
rx
}
async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
match cmd {
SyncerCmd::GetInfo { respond_to } => {
let info = self.syncing_info().await?;
respond_to.maybe_send(info);
}
}
Ok(())
}
#[instrument(skip_all)]
async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
let new_head_height = new_head.height().value();
self.set_subjective_head_height(new_head_height);
if let Ok(store_head_height) = self.store.head_height().await {
if store_head_height + 1 == new_head_height {
if self.store.insert(new_head).await.is_ok() {
self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
height: new_head_height,
});
}
}
}
Ok(())
}
fn set_subjective_head_height(&mut self, height: u64) {
if let Some(old_height) = self.subjective_head_height {
if height <= old_height {
return;
}
}
self.subjective_head_height = Some(height);
}
#[instrument(skip_all)]
async fn fetch_next_batch(
&mut self,
headers_tx: &mpsc::Sender<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
) -> Result<()> {
if self.ongoing_batch.is_some() {
return Ok(());
}
if self.p2p.peer_tracker_info().num_connected_peers == 0 {
return Ok(());
}
let Some(subjective_head_height) = self.subjective_head_height else {
return Ok(());
};
let store_ranges = self.store.get_stored_header_ranges().await?;
let next_batch = calculate_range_to_fetch(
subjective_head_height,
store_ranges.as_ref(),
self.batch_size,
);
if next_batch.is_empty() {
return Ok(());
}
match self.store.get_by_height(next_batch.end() + 1).await {
Ok(known_header) => {
if !in_syncing_window(&known_header) {
return Ok(());
}
}
Err(StoreError::NotFound) => {}
Err(e) => return Err(e.into()),
}
self.event_pub.send(NodeEvent::FetchingHeadersStarted {
from_height: *next_batch.start(),
to_height: *next_batch.end(),
});
let cancellation_token = self.cancellation_token.child_token();
self.ongoing_batch = Some(Ongoing {
batch: next_batch.clone(),
cancellation_token: cancellation_token.clone(),
});
let tx = headers_tx.clone();
let p2p = self.p2p.clone();
spawn_cancellable(cancellation_token, async move {
let now = Instant::now();
let res = p2p.get_unverified_header_range(next_batch).await;
let _ = tx.send((res, now.elapsed())).await;
});
Ok(())
}
#[instrument(skip_all)]
async fn on_fetch_next_batch_result(
&mut self,
res: Result<Vec<ExtendedHeader>, P2pError>,
took: Duration,
) -> Result<()> {
let Some(ongoing) = self.ongoing_batch.take() else {
warn!("No batch was scheduled, however result was received. Discarding it.");
return Ok(());
};
let from_height = *ongoing.batch.start();
let to_height = *ongoing.batch.end();
let headers = match res {
Ok(headers) => headers,
Err(e) => {
if e.is_fatal() {
return Err(e.into());
}
self.event_pub.send(NodeEvent::FetchingHeadersFailed {
from_height,
to_height,
error: e.to_string(),
took,
});
return Ok(());
}
};
if let Err(e) = self.store.insert(headers).await {
if e.is_fatal() {
return Err(e.into());
}
self.event_pub.send(NodeEvent::FetchingHeadersFailed {
from_height,
to_height,
error: format!("Failed to store headers: {e}"),
took,
});
}
self.event_pub.send(NodeEvent::FetchingHeadersFinished {
from_height,
to_height,
took,
});
Ok(())
}
}
fn calculate_range_to_fetch(
subjective_head_height: u64,
store_headers: &[BlockRange],
limit: u64,
) -> BlockRange {
let mut store_headers_iter = store_headers.iter().rev();
let Some(store_head_range) = store_headers_iter.next() else {
let range = 1..=subjective_head_height;
return range.truncate_right(limit);
};
if store_head_range.end() < &subjective_head_height {
let range = store_head_range.end() + 1..=subjective_head_height;
return range.truncate_right(limit);
}
let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
let range = penultimate_range_end + 1..=store_head_range.start().saturating_sub(1);
range.truncate_left(limit)
}
fn in_syncing_window(header: &ExtendedHeader) -> bool {
let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| {
warn!("underflow when computing syncing window start, defaulting to unix epoch");
Time::unix_epoch()
});
header.time().after(syncing_window_start)
}
async fn try_init<S>(
p2p: &P2p,
store: &S,
event_pub: &EventPublisher,
event_reported: &mut bool,
) -> Result<ExtendedHeader>
where
S: Store,
{
p2p.wait_connected_trusted().await?;
if !*event_reported {
event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
*event_reported = true;
}
let network_head = p2p.get_head_header().await?;
let try_insert = match store.get_head().await {
Ok(store_head) => store_head.hash() != network_head.hash(),
Err(StoreError::NotFound) => true,
Err(e) => return Err(e.into()),
};
if try_insert {
store.insert(network_head.clone()).await?;
}
Ok(network_head)
}
async fn header_sub_recv(
rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
) -> Result<ExtendedHeader> {
rx.expect("header-sub not initialized")
.recv()
.await
.ok_or(SyncerError::P2p(P2pError::WorkerDied))
}
#[cfg(test)]
mod tests {
use std::ops::RangeInclusive;
use super::*;
use crate::block_ranges::{BlockRange, BlockRangeExt};
use crate::events::EventChannel;
use crate::p2p::header_session;
use crate::store::InMemoryStore;
use crate::test_utils::{async_test, gen_filled_store, MockP2pHandle};
use celestia_types::test_utils::ExtendedHeaderGenerator;
#[test]
fn calculate_range_to_fetch_test_header_limit() {
let head_height = 1024;
let ranges = [256..=512];
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
assert_eq!(fetch_range, 513..=528);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
assert_eq!(fetch_range, 513..=1023);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
assert_eq!(fetch_range, 513..=1024);
let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
assert_eq!(fetch_range, 513..=1024);
}
#[test]
fn calculate_range_to_fetch_empty_store() {
let fetch_range = calculate_range_to_fetch(1, &[], 100);
assert_eq!(fetch_range, 1..=1);
let fetch_range = calculate_range_to_fetch(100, &[], 10);
assert_eq!(fetch_range, 1..=10);
let fetch_range = calculate_range_to_fetch(100, &[], 50);
assert_eq!(fetch_range, 1..=50);
}
#[test]
fn calculate_range_to_fetch_fully_synced() {
let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
assert!(fetch_range.is_empty());
let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
assert!(fetch_range.is_empty());
let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
assert!(fetch_range.is_empty());
}
#[test]
fn calculate_range_to_fetch_caught_up() {
let head_height = 4000;
let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
assert_eq!(fetch_range, 2500..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
assert_eq!(fetch_range, 2801..=2999);
let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
assert_eq!(fetch_range, 1..=299);
}
#[test]
fn calculate_range_to_fetch_catching_up() {
let head_height = 4000;
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
assert_eq!(fetch_range, 3001..=3500);
let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
assert_eq!(fetch_range, 3501..=4000);
let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
assert_eq!(fetch_range, 3801..=4000);
}
#[async_test]
async fn init_without_genesis_hash() {
let events = EventChannel::new();
let (mock, mut handle) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let header = gen.next();
let _syncer = Syncer::start(SyncerArgs {
p2p: Arc::new(mock),
store: Arc::new(InMemoryStore::new()),
event_pub: events.publisher(),
batch_size: 512,
})
.unwrap();
handle.expect_no_cmd().await;
handle.announce_peer_connected();
handle.expect_no_cmd().await;
handle.announce_trusted_peer_connected();
let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
respond_to.send(Ok(vec![header.clone()])).unwrap();
let head_from_syncer = handle.expect_init_header_sub().await;
assert_eq!(head_from_syncer, header);
handle.expect_no_cmd().await;
}
#[async_test]
async fn init_with_genesis_hash() {
let mut gen = ExtendedHeaderGenerator::new();
let head = gen.next();
let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
p2p_mock.expect_no_cmd().await;
}
#[async_test]
async fn syncing() {
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(1500);
let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
let header1501 = gen.next();
p2p_mock.announce_new_head(header1501.clone());
assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
p2p_mock.expect_no_cmd().await;
let header1502 = gen.next();
p2p_mock.announce_new_head(header1502.clone());
assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
p2p_mock.expect_no_cmd().await;
let headers_1503_1505 = gen.next_many(3);
p2p_mock.announce_new_head(headers_1503_1505[2].clone());
assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
let mut headers = gen.next_many(1495);
p2p_mock.announce_new_head(headers[1494].clone());
assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
headers.push(gen.next());
p2p_mock.announce_new_head(headers.last().unwrap().clone());
assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
p2p_mock.expect_no_cmd().await;
}
#[async_test]
async fn window_edge() {
let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
let mut gen = ExtendedHeaderGenerator::new();
gen.set_time(
(Time::now() - month_and_day_ago).expect("to not underflow"),
Duration::from_secs(1),
);
let mut headers = gen.next_many(1200);
gen.reset_time();
headers.append(&mut gen.next_many(2049 - 1200));
let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
p2p_mock.expect_no_cmd().await;
}
#[async_test]
async fn start_with_filled_store() {
let events = EventChannel::new();
let (p2p, mut p2p_mock) = P2p::mocked();
let (store, mut gen) = gen_filled_store(25).await;
let store = Arc::new(store);
let mut headers = gen.next_many(520);
let network_head = gen.next(); let syncer = Syncer::start(SyncerArgs {
p2p: Arc::new(p2p),
store: store.clone(),
event_pub: events.publisher(),
batch_size: 512,
})
.unwrap();
p2p_mock.announce_trusted_peer_connected();
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
respond_to.send(Ok(vec![network_head.clone()])).unwrap();
let head_from_syncer = p2p_mock.expect_init_header_sub().await;
assert_eq!(head_from_syncer, network_head);
assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 26);
assert_eq!(amount, 8);
respond_to
.send(Ok(headers.drain(..8).collect()))
.map_err(|_| "headers [538, 545]")
.unwrap();
assert_syncing(&syncer, &store, &[1..=546], 546).await;
p2p_mock.expect_no_cmd().await;
}
#[async_test]
async fn stop_syncer() {
let mut gen = ExtendedHeaderGenerator::new();
let head = gen.next();
let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
p2p_mock.expect_no_cmd().await;
syncer.stop();
sleep(Duration::from_millis(1)).await;
assert!(matches!(
syncer.info().await.unwrap_err(),
SyncerError::WorkerDied
));
}
#[async_test]
async fn all_peers_disconnected() {
let mut gen = ExtendedHeaderGenerator::new();
let _gap = gen.next_many(24);
let header25 = gen.next();
let _gap = gen.next_many(4);
let header30 = gen.next();
let _gap = gen.next_many(4);
let header35 = gen.next();
let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
p2p_mock.announce_all_peers_disconnected();
p2p_mock.expect_no_cmd().await;
p2p_mock.announce_peer_connected();
p2p_mock.expect_no_cmd().await;
p2p_mock.announce_trusted_peer_connected();
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
respond_to.send(Ok(vec![header25])).unwrap();
assert_syncing(&syncer, &store, &[30..=30], 30).await;
sleep(Duration::from_secs(1)).await;
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
respond_to.send(Ok(vec![header35.clone()])).unwrap();
assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
let head_from_syncer = p2p_mock.expect_init_header_sub().await;
assert_eq!(head_from_syncer, header35);
let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 31);
assert_eq!(amount, 4);
p2p_mock.announce_all_peers_disconnected();
p2p_mock.expect_no_cmd().await;
}
#[async_test]
async fn all_peers_disconnected_and_no_network_head_progress() {
let mut gen = ExtendedHeaderGenerator::new_from_height(30);
let header30 = gen.next();
let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
p2p_mock.announce_all_peers_disconnected();
p2p_mock.expect_no_cmd().await;
p2p_mock.announce_peer_connected();
p2p_mock.expect_no_cmd().await;
p2p_mock.announce_trusted_peer_connected();
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
respond_to.send(Ok(vec![header30.clone()])).unwrap();
assert_syncing(&syncer, &store, &[30..=30], 30).await;
let head_from_syncer = p2p_mock.expect_init_header_sub().await;
assert_eq!(head_from_syncer, header30);
handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
p2p_mock.announce_all_peers_disconnected();
p2p_mock.expect_no_cmd().await;
}
#[async_test]
async fn non_contiguous_response() {
let mut gen = ExtendedHeaderGenerator::new();
let mut headers = gen.next_many(20);
let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
let header10 = headers[10].clone();
headers[10] = headers[11].clone();
handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
assert_syncing(&syncer, &store, &[20..=20], 20).await;
headers[10] = header10;
handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
assert_syncing(&syncer, &store, &[1..=20], 20).await;
}
#[async_test]
async fn another_chain_response() {
let headers = ExtendedHeaderGenerator::new().next_many(20);
let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
assert_syncing(&syncer, &store, &[20..=20], 20).await;
handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
assert_syncing(&syncer, &store, &[1..=20], 20).await;
}
async fn assert_syncing(
syncer: &Syncer<InMemoryStore>,
store: &InMemoryStore,
expected_synced_ranges: &[RangeInclusive<u64>],
expected_subjective_head: u64,
) {
sleep(Duration::from_millis(1)).await;
let store_ranges = store.get_stored_header_ranges().await.unwrap();
let syncing_info = syncer.info().await.unwrap();
assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
assert_eq!(syncing_info.subjective_head, expected_subjective_head);
}
async fn initialized_syncer(
head: ExtendedHeader,
) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
let events = EventChannel::new();
let (mock, mut handle) = P2p::mocked();
let store = Arc::new(InMemoryStore::new());
let syncer = Syncer::start(SyncerArgs {
p2p: Arc::new(mock),
store: store.clone(),
event_pub: events.publisher(),
batch_size: 512,
})
.unwrap();
handle.expect_no_cmd().await;
handle.announce_peer_connected();
handle.expect_no_cmd().await;
handle.announce_trusted_peer_connected();
let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
respond_to.send(Ok(vec![head.clone()])).unwrap();
let head_from_syncer = handle.expect_init_header_sub().await;
assert_eq!(head_from_syncer, head);
let head_height = head.height().value();
assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
(syncer, store, handle)
}
async fn handle_session_batch(
p2p_mock: &mut MockP2pHandle,
remaining_headers: &[ExtendedHeader],
range: BlockRange,
respond: bool,
) {
range.validate().unwrap();
let mut ranges_to_request = BlockRanges::new();
ranges_to_request.insert_relaxed(&range).unwrap();
for _ in 0..requests_in_session(range.len()) {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
let requested_range = height..=height + amount - 1;
ranges_to_request.remove_strict(requested_range);
if respond {
let header_index = remaining_headers
.iter()
.position(|h| h.height().value() == height)
.expect("height not found in provided headers");
let response_range =
remaining_headers[header_index..header_index + amount as usize].to_vec();
respond_to
.send(Ok(response_range))
.map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
.unwrap();
}
}
assert!(
ranges_to_request.is_empty(),
"Some headers weren't requested. expected range: {}, not requested: {}",
range.display(),
ranges_to_request
);
}
fn requests_in_session(headers: u64) -> usize {
let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
if max_requests > header_session::MAX_CONCURRENT_REQS {
max_requests
} else {
header_session::MAX_CONCURRENT_REQS.min(min_requests)
}
}
impl BlockRanges {
fn remove_strict(&mut self, range: BlockRange) {
for stored in self.as_ref() {
if stored.contains(range.start()) && stored.contains(range.end()) {
self.remove_relaxed(range).unwrap();
return;
}
}
panic!("block ranges ({self}) don't contain {}", range.display());
}
}
}