1use crate::LOG_TARGET;
8use log::trace;
9use soil_network::common::sync::message;
10use soil_network::types::PeerId;
11use std::{
12 cmp,
13 collections::{BTreeMap, HashMap},
14 ops::Range,
15};
16use subsoil::runtime::traits::{Block as BlockT, NumberFor, One};
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct BlockData<B: BlockT> {
21 pub block: message::BlockData<B>,
23 pub origin: Option<PeerId>,
25}
26
27#[derive(Debug)]
28enum BlockRangeState<B: BlockT> {
29 Downloading { len: NumberFor<B>, downloading: u32 },
30 Complete(Vec<BlockData<B>>),
31 Queued { len: NumberFor<B> },
32}
33
34impl<B: BlockT> BlockRangeState<B> {
35 pub fn len(&self) -> NumberFor<B> {
36 match *self {
37 Self::Downloading { len, .. } => len,
38 Self::Complete(ref blocks) => (blocks.len() as u32).into(),
39 Self::Queued { len } => len,
40 }
41 }
42}
43
44#[derive(Default)]
46pub struct BlockCollection<B: BlockT> {
47 blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
49 peer_requests: HashMap<PeerId, NumberFor<B>>,
50 queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
53}
54
55impl<B: BlockT> BlockCollection<B> {
56 pub fn new() -> Self {
58 Self {
59 blocks: BTreeMap::new(),
60 peer_requests: HashMap::new(),
61 queued_blocks: HashMap::new(),
62 }
63 }
64
65 pub fn clear(&mut self) {
67 self.blocks.clear();
68 self.peer_requests.clear();
69 }
70
71 pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
73 if blocks.is_empty() {
74 return;
75 }
76
77 match self.blocks.get(&start) {
78 Some(&BlockRangeState::Downloading { .. }) => {
79 trace!(target: LOG_TARGET, "Inserting block data still marked as being downloaded: {}", start);
80 },
81 Some(BlockRangeState::Complete(existing)) if existing.len() >= blocks.len() => {
82 trace!(target: LOG_TARGET, "Ignored block data already downloaded: {}", start);
83 return;
84 },
85 _ => (),
86 }
87
88 self.blocks.insert(
89 start,
90 BlockRangeState::Complete(
91 blocks.into_iter().map(|b| BlockData { origin: Some(who), block: b }).collect(),
92 ),
93 );
94 }
95
96 pub fn needed_blocks(
99 &mut self,
100 who: PeerId,
101 count: u32,
102 peer_best: NumberFor<B>,
103 common: NumberFor<B>,
104 max_parallel: u32,
105 max_ahead: u32,
106 ) -> Option<Range<NumberFor<B>>> {
107 if peer_best <= common {
108 return None;
110 }
111 let first_different = common + <NumberFor<B>>::one();
113 let count = (count as u32).into();
114 let (mut range, downloading) = {
115 let mut downloading_iter = self.blocks.iter().peekable();
117 let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
118 loop {
119 let next = downloading_iter.next();
120 break match (prev, next) {
121 (Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
126 if downloading < max_parallel && *start >= first_different =>
127 {
128 (*start..*start + *len, downloading)
129 },
130 (Some((start, r)), Some((next_start, _)))
133 if *start + r.len() < *next_start
134 && *start + r.len() >= first_different =>
135 {
136 (*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0)
137 },
138 (Some((start, r)), None) if *start + r.len() >= first_different => {
141 (*start + r.len()..*start + r.len() + count, 0)
142 },
143 (None, None) => (first_different..first_different + count, 0),
146 (None, Some((start, _))) if *start > first_different => {
148 (first_different..cmp::min(first_different + count, *start), 0)
149 },
150 _ => {
152 prev = next;
153 continue;
154 },
155 };
156 }
157 };
158 if range.start > peer_best {
160 trace!(target: LOG_TARGET, "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
161 return None;
162 }
163 range.end = cmp::min(peer_best + One::one(), range.end);
164
165 if self
166 .blocks
167 .iter()
168 .next()
169 .map_or(false, |(n, _)| range.start > *n + max_ahead.into())
170 {
171 trace!(target: LOG_TARGET, "Too far ahead for peer {} ({})", who, range.start);
172 return None;
173 }
174
175 self.peer_requests.insert(who, range.start);
176 self.blocks.insert(
177 range.start,
178 BlockRangeState::Downloading {
179 len: range.end - range.start,
180 downloading: downloading + 1,
181 },
182 );
183 if range.end <= range.start {
184 panic!(
185 "Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
186 range, count, peer_best, common, self.blocks
187 );
188 }
189 Some(range)
190 }
191
192 pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
198 let mut ready = Vec::new();
199
200 let mut prev = from;
201 for (&start, range_data) in &mut self.blocks {
202 if start > prev {
203 break;
204 }
205 let len = match range_data {
206 BlockRangeState::Complete(blocks) => {
207 let len = (blocks.len() as u32).into();
208 prev = start + len;
209 if let Some(BlockData { block, .. }) = blocks.first() {
210 self.queued_blocks
211 .insert(block.hash, (start, start + (blocks.len() as u32).into()));
212 }
213 ready.append(blocks);
215 len
216 },
217 BlockRangeState::Queued { .. } => continue,
218 _ => break,
219 };
220 *range_data = BlockRangeState::Queued { len };
221 }
222 trace!(target: LOG_TARGET, "{} blocks ready for import", ready.len());
223 ready
224 }
225
226 pub fn clear_queued(&mut self, hash: &B::Hash) {
227 if let Some((from, to)) = self.queued_blocks.remove(hash) {
228 let mut block_num = from;
229 while block_num < to {
230 self.blocks.remove(&block_num);
231 block_num += One::one();
232 }
233 trace!(target: LOG_TARGET, "Cleared blocks from {:?} to {:?}", from, to);
234 }
235 }
236
237 pub fn clear_peer_download(&mut self, who: &PeerId) {
238 if let Some(start) = self.peer_requests.remove(who) {
239 let remove = match self.blocks.get_mut(&start) {
240 Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
241 if *downloading > 1 =>
242 {
243 *downloading -= 1;
244 false
245 },
246 Some(&mut BlockRangeState::Downloading { .. }) => true,
247 _ => false,
248 };
249 if remove {
250 self.blocks.remove(&start);
251 }
252 }
253 }
254}
255
256#[cfg(test)]
257mod test {
258 use super::{BlockCollection, BlockData, BlockRangeState};
259 use soil_network::common::sync::message;
260 use soil_network::types::PeerId;
261 use subsoil::core::H256;
262 use subsoil::runtime::testing::{Block as RawBlock, MockCallU64, TestXt};
263
264 type Block = RawBlock<TestXt<MockCallU64, ()>>;
265
266 fn is_empty(bc: &BlockCollection<Block>) -> bool {
267 bc.blocks.is_empty() && bc.peer_requests.is_empty()
268 }
269
270 fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
271 (0..n)
272 .map(|_| message::generic::BlockData {
273 hash: H256::random(),
274 header: None,
275 body: None,
276 indexed_body: None,
277 message_queue: None,
278 receipt: None,
279 justification: None,
280 justifications: None,
281 })
282 .collect()
283 }
284
285 #[test]
286 fn create_clear() {
287 let mut bc = BlockCollection::new();
288 assert!(is_empty(&bc));
289 bc.insert(1, generate_blocks(100), PeerId::random());
290 assert!(!is_empty(&bc));
291 bc.clear();
292 assert!(is_empty(&bc));
293 }
294
295 #[test]
296 fn insert_blocks() {
297 let mut bc = BlockCollection::new();
298 assert!(is_empty(&bc));
299 let peer0 = PeerId::random();
300 let peer1 = PeerId::random();
301 let peer2 = PeerId::random();
302
303 let blocks = generate_blocks(150);
304 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(1..41));
305 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(41..81));
306 assert_eq!(bc.needed_blocks(peer2, 40, 150, 0, 1, 200), Some(81..121));
307
308 bc.clear_peer_download(&peer1);
309 bc.insert(41, blocks[41..81].to_vec(), peer1);
310 assert_eq!(bc.ready_blocks(1), vec![]);
311 assert_eq!(bc.needed_blocks(peer1, 40, 150, 0, 1, 200), Some(121..151));
312 bc.clear_peer_download(&peer0);
313 bc.insert(1, blocks[1..11].to_vec(), peer0);
314
315 assert_eq!(bc.needed_blocks(peer0, 40, 150, 0, 1, 200), Some(11..41));
316 assert_eq!(
317 bc.ready_blocks(1),
318 blocks[1..11]
319 .iter()
320 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
321 .collect::<Vec<_>>()
322 );
323
324 bc.clear_peer_download(&peer0);
325 bc.insert(11, blocks[11..41].to_vec(), peer0);
326
327 let ready = bc.ready_blocks(12);
328 assert_eq!(
329 ready[..30],
330 blocks[11..41]
331 .iter()
332 .map(|b| BlockData { block: b.clone(), origin: Some(peer0) })
333 .collect::<Vec<_>>()[..]
334 );
335 assert_eq!(
336 ready[30..],
337 blocks[41..81]
338 .iter()
339 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
340 .collect::<Vec<_>>()[..]
341 );
342
343 bc.clear_peer_download(&peer2);
344 assert_eq!(bc.needed_blocks(peer2, 40, 150, 80, 1, 200), Some(81..121));
345 bc.clear_peer_download(&peer2);
346 bc.insert(81, blocks[81..121].to_vec(), peer2);
347 bc.clear_peer_download(&peer1);
348 bc.insert(121, blocks[121..150].to_vec(), peer1);
349
350 assert_eq!(bc.ready_blocks(80), vec![]);
351 let ready = bc.ready_blocks(81);
352 assert_eq!(
353 ready[..40],
354 blocks[81..121]
355 .iter()
356 .map(|b| BlockData { block: b.clone(), origin: Some(peer2) })
357 .collect::<Vec<_>>()[..]
358 );
359 assert_eq!(
360 ready[40..],
361 blocks[121..150]
362 .iter()
363 .map(|b| BlockData { block: b.clone(), origin: Some(peer1) })
364 .collect::<Vec<_>>()[..]
365 );
366 }
367
368 #[test]
369 fn large_gap() {
370 let mut bc: BlockCollection<Block> = BlockCollection::new();
371 bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
372 let blocks = generate_blocks(10)
373 .into_iter()
374 .map(|b| BlockData { block: b, origin: None })
375 .collect();
376 bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
377
378 let peer0 = PeerId::random();
379 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), Some(1..100));
380 assert_eq!(bc.needed_blocks(peer0, 128, 10000, 0, 1, 200), None); assert_eq!(
382 bc.needed_blocks(peer0, 128, 10000, 0, 1, 200000),
383 Some(100 + 128..100 + 128 + 128)
384 );
385 }
386
387 #[test]
388 fn no_duplicate_requests_on_fork() {
389 let mut bc = BlockCollection::new();
390 assert!(is_empty(&bc));
391 let peer = PeerId::random();
392
393 let blocks = generate_blocks(10);
394
395 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
397
398 bc.clear_peer_download(&peer);
400 bc.insert(40, blocks[..5].to_vec(), peer);
401
402 let ready = bc.ready_blocks(48);
404 assert_eq!(
405 ready,
406 blocks[..5]
407 .iter()
408 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
409 .collect::<Vec<_>>()
410 );
411
412 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
413 }
414
415 #[test]
416 fn clear_queued_subsequent_ranges() {
417 let mut bc = BlockCollection::new();
418 assert!(is_empty(&bc));
419 let peer = PeerId::random();
420
421 let blocks = generate_blocks(10);
422
423 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(40..45));
425 assert_eq!(bc.needed_blocks(peer, 5, 50, 39, 0, 200), Some(45..50));
426
427 bc.clear_peer_download(&peer);
429 bc.insert(40, blocks.to_vec(), peer);
430
431 let ready = bc.ready_blocks(1000);
433 assert_eq!(
434 ready,
435 blocks
436 .iter()
437 .map(|b| BlockData { block: b.clone(), origin: Some(peer) })
438 .collect::<Vec<_>>()
439 );
440
441 bc.clear_queued(&blocks[0].hash);
442 assert!(bc.blocks.is_empty());
443 assert!(bc.queued_blocks.is_empty());
444 }
445
446 #[test]
447 fn downloaded_range_is_requested_from_max_parallel_peers() {
448 let mut bc = BlockCollection::new();
449 assert!(is_empty(&bc));
450
451 let count = 5;
452 let max_parallel = 2;
454 let max_ahead = 200;
455
456 let peer1 = PeerId::random();
457 let peer2 = PeerId::random();
458 let peer3 = PeerId::random();
459
460 let best = 100;
462 let common = 10;
463
464 assert_eq!(
465 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
466 Some(11..16)
467 );
468 assert_eq!(
469 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
470 Some(11..16)
471 );
472 assert_eq!(
473 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
474 Some(16..21)
475 );
476 }
477 #[test]
478 fn downloaded_range_not_requested_from_peers_with_higher_common_number() {
479 let mut bc = BlockCollection::new();
486 assert!(is_empty(&bc));
487
488 let count = 5;
489 let max_parallel = 2;
490 let max_ahead = 200;
491
492 let peer1 = PeerId::random();
493 let peer1_best = 20;
494 let peer1_common = 10;
495
496 let peer2 = PeerId::random();
498 let peer2_best = 20;
499 let peer2_common = 11; assert_eq!(
502 bc.needed_blocks(peer1, count, peer1_best, peer1_common, max_parallel, max_ahead),
503 Some(11..16),
504 );
505 assert_eq!(
506 bc.needed_blocks(peer2, count, peer2_best, peer2_common, max_parallel, max_ahead),
507 Some(16..21),
508 );
509 }
510
511 #[test]
512 fn gap_above_common_number_requested() {
513 let mut bc = BlockCollection::new();
514 assert!(is_empty(&bc));
515
516 let count = 5;
517 let best = 30;
518 let max_parallel = 1;
521 let max_ahead = 200;
522
523 let peer1 = PeerId::random();
524 let peer2 = PeerId::random();
525 let peer3 = PeerId::random();
526
527 let common = 10;
528 assert_eq!(
529 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
530 Some(11..16),
531 );
532 assert_eq!(
533 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
534 Some(16..21),
535 );
536 assert_eq!(
537 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
538 Some(21..26),
539 );
540
541 bc.clear_peer_download(&peer2);
544
545 assert_eq!(
547 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
548 Some(16..21),
549 );
550 }
551
552 #[test]
553 fn gap_below_common_number_not_requested() {
554 let mut bc = BlockCollection::new();
555 assert!(is_empty(&bc));
556
557 let count = 5;
558 let best = 30;
559 let max_parallel = 1;
562 let max_ahead = 200;
563
564 let peer1 = PeerId::random();
565 let peer2 = PeerId::random();
566 let peer3 = PeerId::random();
567
568 let common = 10;
569 assert_eq!(
570 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
571 Some(11..16),
572 );
573 assert_eq!(
574 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
575 Some(16..21),
576 );
577 assert_eq!(
578 bc.needed_blocks(peer3, count, best, common, max_parallel, max_ahead),
579 Some(21..26),
580 );
581
582 bc.clear_peer_download(&peer2);
585
586 let common = 23;
588 assert_eq!(
589 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
590 Some(26..31), );
592 }
593
594 #[test]
595 fn range_at_the_end_above_common_number_requested() {
596 let mut bc = BlockCollection::new();
597 assert!(is_empty(&bc));
598
599 let count = 5;
600 let best = 30;
601 let max_parallel = 1;
602 let max_ahead = 200;
603
604 let peer1 = PeerId::random();
605 let peer2 = PeerId::random();
606
607 let common = 10;
608 assert_eq!(
609 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
610 Some(11..16),
611 );
612 assert_eq!(
613 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
614 Some(16..21),
615 );
616 }
617
618 #[test]
619 fn range_at_the_end_below_common_number_not_requested() {
620 let mut bc = BlockCollection::new();
621 assert!(is_empty(&bc));
622
623 let count = 5;
624 let best = 30;
625 let max_parallel = 1;
626 let max_ahead = 200;
627
628 let peer1 = PeerId::random();
629 let peer2 = PeerId::random();
630
631 let common = 10;
632 assert_eq!(
633 bc.needed_blocks(peer1, count, best, common, max_parallel, max_ahead),
634 Some(11..16),
635 );
636
637 let common = 20;
638 assert_eq!(
639 bc.needed_blocks(peer2, count, best, common, max_parallel, max_ahead),
640 Some(21..26), );
642 }
643}