use celestia_proto::p2p::pb::HeaderRequest;
use celestia_types::ExtendedHeader;
use futures::FutureExt;
use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::{mpsc, oneshot};
use tracing::debug;
use crate::block_ranges::{BlockRange, BlockRangeExt};
use crate::p2p::header_ex::utils::HeaderRequestExt;
use crate::p2p::{P2pCmd, P2pError};
pub(crate) const MIN_AMOUNT_PER_REQ: u64 = 8;
pub(crate) const MAX_AMOUNT_PER_REQ: u64 = 64;
pub(crate) const MAX_CONCURRENT_REQS: usize = 8;
type Result<T, E = P2pError> = std::result::Result<T, E>;
type TaskResult = (u64, u64, Result<Vec<ExtendedHeader>>);
pub(crate) struct HeaderSession {
to_fetch: Option<BlockRange>,
cmd_tx: mpsc::Sender<P2pCmd>,
tasks: FuturesUnordered<BoxFuture<'static, TaskResult>>,
batch_size: u64,
}
impl HeaderSession {
pub(crate) fn new(range: BlockRange, cmd_tx: mpsc::Sender<P2pCmd>) -> Self {
let batch_size = range
.len()
.div_ceil(MAX_CONCURRENT_REQS as u64)
.clamp(MIN_AMOUNT_PER_REQ, MAX_AMOUNT_PER_REQ);
HeaderSession {
to_fetch: Some(range),
cmd_tx,
tasks: FuturesUnordered::new(),
batch_size,
}
}
pub(crate) async fn run(&mut self) -> Result<Vec<ExtendedHeader>> {
let mut responses = Vec::new();
for _ in 0..MAX_CONCURRENT_REQS {
self.send_next_request().await;
}
while let Some((height, requested_amount, res)) = self.tasks.next().await {
match res {
Ok(headers) => {
let headers_len = headers.len() as u64;
if headers_len > 0 {
responses.push(headers);
}
if headers_len < requested_amount {
let height = height + headers_len;
let amount = requested_amount - headers_len;
self.send_request(height, amount).await;
debug!(
"requested {requested_amount}, got {headers_len}: retrying {height} +{amount}"
);
} else {
self.send_next_request().await;
}
}
Err(P2pError::HeaderEx(e)) => {
debug!("HeaderEx error: {e}");
self.send_request(height, requested_amount).await;
}
Err(e) => return Err(e),
}
}
responses.sort_unstable_by_key(|span| {
span.first()
.expect("empty spans aren't added in receiving loop")
.height()
});
Ok(responses.into_iter().flatten().collect())
}
pub(crate) async fn send_next_request(&mut self) {
if let Some(range) = take_next_batch(&mut self.to_fetch, self.batch_size) {
self.send_request(*range.start(), range.len()).await;
}
}
pub(crate) async fn send_request(&mut self, height: u64, amount: u64) {
debug!("Fetching batch {} until {}", height, height + amount - 1);
let p2p_cmd_rx = self.cmd_tx.clone();
let request = HeaderRequest::with_origin(height, amount);
self.tasks.push(
async move {
let result = async move {
let (tx, rx) = oneshot::channel();
p2p_cmd_rx
.send(P2pCmd::HeaderExRequest {
request,
respond_to: tx,
})
.await
.map_err(|_| P2pError::WorkerDied)?;
rx.await?
}
.await;
(height, amount, result)
}
.boxed(),
);
}
}
fn take_next_batch(range_to_fetch: &mut Option<BlockRange>, limit: u64) -> Option<BlockRange> {
let end_offset = limit.checked_sub(1)?;
let to_fetch = range_to_fetch.take()?;
if to_fetch.len() <= limit {
Some(to_fetch)
} else {
let _ = range_to_fetch.insert(*to_fetch.start()..=*to_fetch.end() - limit);
Some(*to_fetch.end() - end_offset..=*to_fetch.end())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::p2p::{HeaderExError, P2p};
use celestia_types::test_utils::ExtendedHeaderGenerator;
use lumina_utils::executor::spawn;
use lumina_utils::test_utils::async_test;
async fn test_batching(to_fetch: u64, batches: usize, batch_size: u64) {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut generator = ExtendedHeaderGenerator::new();
let headers = generator.next_many(to_fetch);
let mut session = HeaderSession::new(1..=to_fetch, p2p_mock.cmd_tx.clone());
assert_eq!(session.batch_size, batch_size);
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});
let full_batches = batches - 1;
for i in 1..=full_batches {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
let end_offset = i as u64 * batch_size;
let expected_start = 1 + to_fetch - end_offset;
assert_eq!(height, expected_start);
assert_eq!(amount, batch_size);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}
let leftover = to_fetch - batch_size * full_batches as u64;
if leftover > 0 {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, leftover);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}
p2p_mock.expect_no_cmd().await;
let received_headers = result_rx.await.unwrap().unwrap();
assert_eq!(headers, received_headers);
}
#[async_test]
async fn split_range_to_batches() {
test_batching(1, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(5, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(7, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(8, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(10, 2, MIN_AMOUNT_PER_REQ).await;
test_batching(16, 2, MIN_AMOUNT_PER_REQ).await;
test_batching(30, 4, MIN_AMOUNT_PER_REQ).await;
test_batching(50, 7, MIN_AMOUNT_PER_REQ).await;
test_batching(63, 8, MIN_AMOUNT_PER_REQ).await;
test_batching(64, 8, MIN_AMOUNT_PER_REQ).await;
test_batching(65, MAX_CONCURRENT_REQS, 9).await;
test_batching(128, MAX_CONCURRENT_REQS, 16).await;
test_batching(129, MAX_CONCURRENT_REQS, 17).await;
test_batching(256, MAX_CONCURRENT_REQS, 32).await;
test_batching(500, MAX_CONCURRENT_REQS, 63).await;
test_batching(512, MAX_CONCURRENT_REQS, 64).await;
test_batching(520, 9, MAX_AMOUNT_PER_REQ).await;
test_batching(600, 10, MAX_AMOUNT_PER_REQ).await;
test_batching(1024, 16, MAX_AMOUNT_PER_REQ).await;
}
#[async_test]
async fn retry_on_missing_range() {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut generator = ExtendedHeaderGenerator::new();
let headers = generator.next_many(8);
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 8);
respond_to.send(Ok(headers[..6].to_vec())).unwrap();
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 7);
assert_eq!(amount, 2);
respond_to.send(Ok(headers[6..8].to_vec())).unwrap();
p2p_mock.expect_no_cmd().await;
let received_headers = result_rx.await.unwrap().unwrap();
assert_eq!(headers, received_headers);
}
#[async_test]
async fn not_found_is_not_fatal() {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut generator = ExtendedHeaderGenerator::new();
let headers = generator.next_many(8);
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 8);
respond_to
.send(Err(P2pError::HeaderEx(HeaderExError::HeaderNotFound)))
.unwrap();
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 8);
respond_to.send(Ok(headers.clone())).unwrap();
p2p_mock.expect_no_cmd().await;
let received_headers = result_rx.await.unwrap().unwrap();
assert_eq!(headers, received_headers);
}
#[test]
fn take_next_batch_full_batch() {
let mut range_to_fetch = Some(1..=10);
let batch = take_next_batch(&mut range_to_fetch, 16);
assert_eq!(batch, Some(1..=10));
assert_eq!(range_to_fetch, None);
}
#[test]
fn take_next_batch_equal_limit() {
let mut range_to_fetch = Some(1..=10);
let batch = take_next_batch(&mut range_to_fetch, 10);
assert_eq!(batch, Some(1..=10));
assert_eq!(range_to_fetch, None);
}
#[test]
fn take_next_batch_truncated_batch() {
let mut range_to_fetch = Some(1..=10);
let batch = take_next_batch(&mut range_to_fetch, 5);
assert_eq!(batch, Some(6..=10));
assert_eq!(range_to_fetch, Some(1..=5));
}
#[test]
fn take_next_batch_truncated_calc() {
let mut range_to_fetch = Some(1..=512);
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(449..=512));
assert_eq!(range_to_fetch, Some(1..=448));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(385..=448));
assert_eq!(range_to_fetch, Some(1..=384));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(321..=384));
assert_eq!(range_to_fetch, Some(1..=320));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(257..=320));
assert_eq!(range_to_fetch, Some(1..=256));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(193..=256));
assert_eq!(range_to_fetch, Some(1..=192));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(129..=192));
assert_eq!(range_to_fetch, Some(1..=128));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(65..=128));
assert_eq!(range_to_fetch, Some(1..=64));
let batch = take_next_batch(&mut range_to_fetch, 64);
assert_eq!(batch, Some(1..=64));
assert_eq!(range_to_fetch, None);
}
#[test]
fn take_next_batch_none() {
let mut range_to_fetch = None;
let batch = take_next_batch(&mut range_to_fetch, 5);
assert_eq!(batch, None);
assert_eq!(range_to_fetch, None);
}
#[test]
fn take_next_batch_zero_batch() {
let mut range_to_fetch = Some(1..=5);
let batch = take_next_batch(&mut range_to_fetch, 0);
assert_eq!(batch, None);
assert_eq!(range_to_fetch, Some(1..=5));
}
}