use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use super::types::ChunkWorkItem;
use super::ParallelIBDConfig;
#[derive(Debug, Clone)]
pub struct BlockChunk {
pub start_height: u64,
pub end_height: u64,
pub peer_id: String,
}
pub fn create_chunks(
config: &ParallelIBDConfig,
start_height: u64,
end_height: u64,
peer_ids: &[String],
scored_peers: Option<&[(String, f64)]>,
) -> Vec<BlockChunk> {
let mut chunks = Vec::new();
let mut current_height = start_height;
let num_peers = peer_ids.len().max(1);
let mut chunk_index: usize = 0;
let use_fastest = (config.mode.eq_ignore_ascii_case("earliest") || config.earliest_first)
&& num_peers > 1
&& scored_peers.map(|s| !s.is_empty()).unwrap_or(false);
let fastest_peer = if use_fastest {
scored_peers.and_then(|s| {
s.iter()
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(p, _)| p.clone())
})
} else {
None
};
if use_fastest && fastest_peer.is_some() {
tracing::info!("IBD: earliest-first — all chunks to fastest peer");
} else {
tracing::info!(
"Round-robin chunk assignment: {} peers, chunk_size={}",
num_peers,
config.chunk_size
);
}
while current_height <= end_height {
let (chunk_sz, is_bootstrap) = if current_height == 0 && start_height == 0 {
let sz = 128.min(end_height.saturating_add(1));
(sz, true)
} else {
(config.chunk_size, false)
};
let chunk_end = (current_height + chunk_sz - 1).min(end_height);
if is_bootstrap {
tracing::info!(
"IBD: bootstrap chunk 0-{} (99 and 100 in same chunk)",
chunk_end
);
}
let peer_id = fastest_peer
.clone()
.unwrap_or_else(|| peer_ids[chunk_index % num_peers].clone());
chunks.push(BlockChunk {
start_height: current_height,
end_height: chunk_end,
peer_id,
});
current_height = chunk_end + 1;
chunk_index += 1;
}
chunks
}
pub(crate) struct ChunkAssigner {
chunks: Vec<(u64, u64)>,
chunk_peers: Vec<String>,
next_index: AtomicUsize,
retry_queue: Mutex<VecDeque<ChunkWorkItem>>,
validation_height: Arc<std::sync::atomic::AtomicU64>,
bootstrap_complete: AtomicBool,
start_height: u64,
in_flight_per_peer: Mutex<HashMap<String, (u64, u64)>>,
}
impl ChunkAssigner {
pub(crate) fn new(
chunks: Vec<(u64, u64)>,
chunk_peers: Vec<String>,
validation_height: Arc<std::sync::atomic::AtomicU64>,
start_height: u64,
) -> Self {
assert_eq!(
chunks.len(),
chunk_peers.len(),
"chunks and chunk_peers must match"
);
let bootstrap_complete = start_height > 0;
Self {
chunks,
chunk_peers,
next_index: AtomicUsize::new(0),
retry_queue: Mutex::new(VecDeque::new()),
validation_height,
bootstrap_complete: AtomicBool::new(bootstrap_complete),
start_height,
in_flight_per_peer: Mutex::new(HashMap::new()),
}
}
pub(crate) fn mark_bootstrap_complete(&self) {
self.bootstrap_complete.store(true, Ordering::Relaxed);
}
pub(crate) fn get_work(&self, peer_id: &str, max_ahead: u64) -> Option<(u64, u64)> {
let bootstrap_done = self.bootstrap_complete.load(Ordering::Relaxed);
let current_validation = self.validation_height.load(Ordering::Relaxed);
let next_needed = current_validation + 1;
let max_start = current_validation.saturating_add(max_ahead);
let allow_chunk = |start: u64| bootstrap_done || start == self.start_height;
let mut guard = self.in_flight_per_peer.lock().unwrap();
if guard.contains_key(peer_id) {
return None;
}
{
let mut retry = self.retry_queue.lock().unwrap();
let critical = retry.iter().enumerate().find(|(_, (s, e, ex))| {
*s <= next_needed
&& next_needed <= *e
&& ex.as_ref() != Some(&peer_id.to_string())
&& *s <= max_start
&& allow_chunk(*s)
});
if let Some((i, _)) = critical {
let (start, end, _) = retry.remove(i).unwrap();
guard.insert(peer_id.to_string(), (start, end));
return Some((start, end));
}
let candidate = retry
.iter()
.enumerate()
.filter(|(_, (_, _, ex))| ex.as_ref() != Some(&peer_id.to_string()))
.filter(|(_, (s, _, _))| *s <= max_start && allow_chunk(*s))
.min_by_key(|(_, (s, _, _))| *s);
if let Some((i, _)) = candidate {
let (start, end, _) = retry.remove(i).unwrap();
guard.insert(peer_id.to_string(), (start, end));
return Some((start, end));
}
}
let idx = self.next_index.load(Ordering::Relaxed);
if idx >= self.chunks.len() {
return None;
}
if !self.chunk_peers.is_empty() && self.chunk_peers[idx] != peer_id {
return None;
}
let (start, end) = self.chunks[idx];
if start > current_validation.saturating_add(max_ahead) {
return None;
}
if !allow_chunk(start) {
return None;
}
self.next_index.store(idx + 1, Ordering::Relaxed);
guard.insert(peer_id.to_string(), (start, end));
Some((start, end))
}
pub(crate) fn on_chunk_complete(&self, peer_id: &str) {
self.in_flight_per_peer.lock().unwrap().remove(peer_id);
}
pub(crate) fn requeue(&self, start: u64, end: u64, exclude_peer: Option<String>) {
self.retry_queue
.lock()
.unwrap()
.push_back((start, end, exclude_peer));
}
pub(crate) fn requeue_chunk_containing_height(&self, height: u64) {
let Some(&(start, end)) = self
.chunks
.iter()
.find(|(s, e)| height >= *s && height <= *e)
else {
tracing::warn!(
"stall recovery: height {} not in any assigner chunk (chunks={})",
height,
self.chunks.len()
);
return;
};
let mut rq = self.retry_queue.lock().unwrap();
if rq.iter().any(|(s, e, _)| *s == start && *e == end) {
return;
}
rq.push_back((start, end, None));
tracing::warn!(
"stall recovery: requeued chunk {}-{} for missing height {}",
start,
end,
height
);
}
pub(crate) fn is_done(&self) -> bool {
let idx = self.next_index.load(Ordering::Relaxed);
idx >= self.chunks.len() && self.retry_queue.lock().unwrap().is_empty()
}
pub(crate) fn total_chunks(&self) -> usize {
self.chunks.len()
}
pub(crate) fn remaining_count(&self) -> usize {
let idx = self.next_index.load(Ordering::Relaxed);
let retry_len = self.retry_queue.lock().unwrap().len();
self.chunks.len().saturating_sub(idx) + retry_len
}
}
pub(crate) struct ChunkGuard {
chunk: Option<ChunkWorkItem>,
peer_id: Option<String>,
assigner: Arc<ChunkAssigner>,
}
impl ChunkGuard {
pub(crate) fn new(
start: u64,
end: u64,
exclude: Option<String>,
peer_id: String,
assigner: Arc<ChunkAssigner>,
) -> Self {
Self {
chunk: Some((start, end, exclude)),
peer_id: Some(peer_id),
assigner,
}
}
pub(crate) fn disarm(&mut self) {
self.chunk = None;
self.peer_id = None; }
}
impl Drop for ChunkGuard {
fn drop(&mut self) {
if let Some((start, end, exclude)) = self.chunk.take() {
self.assigner.requeue(start, end, exclude);
}
if let Some(peer_id) = self.peer_id.take() {
self.assigner.on_chunk_complete(&peer_id);
}
}
}