1use std::cmp::{max, min};
2use std::collections::BTreeMap;
3use std::ops::RangeInclusive;
4
5use derive_where::derive_where;
6use tracing::{debug, error, info, warn};
7
8use malachitebft_core_types::utils::height::DisplayRange;
9use malachitebft_core_types::{Context, Height};
10
11use crate::co::Co;
12use crate::scoring::SyncResult;
13use crate::{
14 perform, Effect, Error, HeightStartType, InboundRequestId, Metrics, OutboundRequestId, PeerId,
15 RawDecidedValue, Request, Resume, State, Status, ValueRequest, ValueResponse,
16};
17
18#[derive_where(Debug)]
19pub enum Input<Ctx: Context> {
20 SendStatusUpdate,
22
23 Status(Status<Ctx>),
25
26 StartedHeight(Ctx::Height, HeightStartType),
29
30 Decided(Ctx::Height),
32
33 ValueRequest(InboundRequestId, PeerId, ValueRequest<Ctx>),
35
36 ValueResponse(OutboundRequestId, PeerId, Option<ValueResponse<Ctx>>),
38
39 GotDecidedValues(
41 InboundRequestId,
42 RangeInclusive<Ctx::Height>,
43 Vec<RawDecidedValue<Ctx>>,
44 ),
45
46 SyncRequestTimedOut(OutboundRequestId, PeerId, Request<Ctx>),
48
49 InvalidValue(PeerId, Ctx::Height),
51
52 ValueProcessingError(PeerId, Ctx::Height),
54}
55
56pub async fn handle<Ctx>(
57 co: Co<Ctx>,
58 state: &mut State<Ctx>,
59 metrics: &Metrics,
60 input: Input<Ctx>,
61) -> Result<(), Error<Ctx>>
62where
63 Ctx: Context,
64{
65 match input {
66 Input::SendStatusUpdate => on_send_status_update(co, state, metrics).await,
67
68 Input::Status(status) => on_status(co, state, metrics, status).await,
69
70 Input::StartedHeight(height, restart) => {
71 on_started_height(co, state, metrics, height, restart).await
72 }
73
74 Input::Decided(height) => on_decided(state, metrics, height).await,
75
76 Input::ValueRequest(request_id, peer_id, request) => {
77 on_value_request(co, state, metrics, request_id, peer_id, request).await
78 }
79
80 Input::ValueResponse(request_id, peer_id, Some(response)) => {
81 on_value_response(co, state, metrics, request_id, peer_id, response).await
82 }
83
84 Input::ValueResponse(request_id, peer_id, None) => {
85 on_invalid_value_response(co, state, metrics, request_id, peer_id).await
86 }
87
88 Input::GotDecidedValues(request_id, range, values) => {
89 on_got_decided_values(co, state, metrics, request_id, range, values).await
90 }
91
92 Input::SyncRequestTimedOut(request_id, peer_id, request) => {
93 on_sync_request_timed_out(co, state, metrics, request_id, peer_id, request).await
94 }
95
96 Input::InvalidValue(peer, value) => on_invalid_value(co, state, metrics, peer, value).await,
97
98 Input::ValueProcessingError(peer, height) => {
99 on_value_processing_error(co, state, metrics, peer, height).await
100 }
101 }
102}
103
104async fn on_value_response<Ctx>(
105 co: Co<Ctx>,
106 state: &mut State<Ctx>,
107 metrics: &Metrics,
108 request_id: OutboundRequestId,
109 peer_id: PeerId,
110 response: ValueResponse<Ctx>,
111) -> Result<(), Error<Ctx>>
112where
113 Ctx: Context,
114{
115 let start = response.start_height;
116 let end = response.end_height().unwrap_or(start);
117 let range_len = end.as_u64() - start.as_u64() + 1;
118
119 let Some((requested_range, stored_peer_id)) = state.pending_requests.get(&request_id) else {
123 warn!(%request_id, %peer_id, "Received response for unknown request ID");
124 return Ok(());
125 };
126
127 if stored_peer_id != &peer_id {
128 warn!(
129 %request_id, actual_peer = %peer_id, expected_peer = %stored_peer_id,
130 "Received response from different peer than expected"
131 );
132
133 return on_invalid_value_response(co, state, metrics, request_id, peer_id).await;
134 }
135
136 let is_valid = start.as_u64() == requested_range.start().as_u64()
137 && start.as_u64() <= end.as_u64()
138 && end.as_u64() <= requested_range.end().as_u64()
139 && response.values.len() as u64 == range_len;
140
141 if !is_valid {
142 warn!(
143 %request_id, %peer_id,
144 "Received request for wrong range of heights: expected {}..={} ({} values), got {}..={} ({} values)",
145 requested_range.start().as_u64(), requested_range.end().as_u64(), range_len,
146 start.as_u64(), end.as_u64(), response.values.len() as u64
147 );
148
149 return on_invalid_value_response(co, state, metrics, request_id, peer_id).await;
150 }
151
152 on_valid_value_response(co, state, metrics, request_id, peer_id, response).await
153}
154
155pub async fn on_send_status_update<Ctx>(
156 co: Co<Ctx>,
157 state: &mut State<Ctx>,
158 _metrics: &Metrics,
159) -> Result<(), Error<Ctx>>
160where
161 Ctx: Context,
162{
163 debug!(tip_height = %state.tip_height, "Broadcasting status");
164
165 perform!(
166 co,
167 Effect::BroadcastStatus(state.tip_height, Default::default())
168 );
169
170 if let Some(inactive_threshold) = state.config.inactive_threshold {
171 state
173 .peer_scorer
174 .reset_inactive_peers_scores(inactive_threshold);
175 }
176
177 debug!("Peer scores: {:?}", state.peer_scorer.get_scores());
178
179 Ok(())
180}
181
182pub async fn on_status<Ctx>(
183 co: Co<Ctx>,
184 state: &mut State<Ctx>,
185 metrics: &Metrics,
186 status: Status<Ctx>,
187) -> Result<(), Error<Ctx>>
188where
189 Ctx: Context,
190{
191 let peer_id = status.peer_id;
192 let peer_height = status.tip_height;
193
194 debug!(%peer_id, %peer_height, "Received peer status");
195
196 state.update_status(status);
197 metrics.status_received(state.peers.len() as u64);
198
199 if !state.started {
200 return Ok(());
202 }
203
204 if peer_height >= state.sync_height {
205 info!(
206 tip_height = %state.tip_height,
207 sync_height = %state.sync_height,
208 peer_height = %peer_height,
209 "SYNC REQUIRED: Falling behind"
210 );
211
212 request_values(co, state, metrics).await?;
215 }
216
217 Ok(())
218}
219
220pub async fn on_started_height<Ctx>(
221 co: Co<Ctx>,
222 state: &mut State<Ctx>,
223 metrics: &Metrics,
224 height: Ctx::Height,
225 start_type: HeightStartType,
226) -> Result<(), Error<Ctx>>
227where
228 Ctx: Context,
229{
230 debug!(%height, is_restart = %start_type.is_restart(), "Consensus started new height");
231
232 state.started = true;
233 state.consensus_height = height;
234
235 state.tip_height = height.decrement().unwrap_or_default();
237
238 state.prune_pending_requests();
240
241 if start_type.is_restart() {
242 state.sync_height = height;
244 state.pending_requests.clear();
246 } else {
247 state.sync_height = max(state.sync_height, height);
249 }
250
251 request_values(co, state, metrics).await?;
253
254 Ok(())
255}
256
257pub async fn on_decided<Ctx>(
258 state: &mut State<Ctx>,
259 _metrics: &Metrics,
260 height: Ctx::Height,
261) -> Result<(), Error<Ctx>>
262where
263 Ctx: Context,
264{
265 debug!(%height, "Consensus decided on new value");
266
267 state.tip_height = height;
268
269 state.prune_pending_requests();
271
272 if state.sync_height == state.tip_height {
274 state.sync_height = state.sync_height.increment();
275 }
276
277 Ok(())
278}
279
280#[tracing::instrument(
281 name = "on_value_request",
282 skip_all,
283 fields(
284 peer_id = %peer_id,
285 request_id = %request_id,
286 range = %DisplayRange(&request.range)
287 )
288)]
289pub async fn on_value_request<Ctx>(
290 co: Co<Ctx>,
291 state: &mut State<Ctx>,
292 metrics: &Metrics,
293 request_id: InboundRequestId,
294 peer_id: PeerId,
295 request: ValueRequest<Ctx>,
296) -> Result<(), Error<Ctx>>
297where
298 Ctx: Context,
299{
300 debug!("Received request for values");
301
302 if !validate_request_range::<Ctx>(&request.range, state.tip_height, state.config.batch_size) {
303 debug!("Sending empty response to peer");
304
305 perform!(
306 co,
307 Effect::SendValueResponse(
308 request_id.clone(),
309 ValueResponse::new(*request.range.start(), vec![]),
310 Default::default()
311 )
312 );
313
314 return Ok(());
315 }
316
317 metrics.value_request_received(request.range.start().as_u64());
318
319 let range = clamp_request_range::<Ctx>(&request.range, state.tip_height);
320
321 if range != request.range {
322 debug!(
323 requested = %DisplayRange(&request.range),
324 clamped = %DisplayRange(&range),
325 "Clamped request range to our tip height"
326 );
327 }
328
329 perform!(
330 co,
331 Effect::GetDecidedValues(request_id, range, Default::default())
332 );
333
334 Ok(())
335}
336
337fn validate_request_range<Ctx>(
338 range: &RangeInclusive<Ctx::Height>,
339 tip_height: Ctx::Height,
340 batch_size: usize,
341) -> bool
342where
343 Ctx: Context,
344{
345 if range.is_empty() {
346 debug!("Received request for empty range of values");
347 return false;
348 }
349
350 if range.start() > range.end() {
351 debug!("Received request for invalid range of values");
352 return false;
353 }
354
355 if range.start() > &tip_height {
356 debug!("Received request for values beyond our tip height {tip_height}");
357 return false;
358 }
359
360 let len = (range.end().as_u64() - range.start().as_u64()).saturating_add(1) as usize;
361 if len > batch_size {
362 warn!("Received request for too many values: requested {len}, max is {batch_size}");
363 return false;
364 }
365
366 true
367}
368
369fn clamp_request_range<Ctx>(
370 range: &RangeInclusive<Ctx::Height>,
371 tip_height: Ctx::Height,
372) -> RangeInclusive<Ctx::Height>
373where
374 Ctx: Context,
375{
376 assert!(!range.is_empty(), "Cannot clamp an empty range");
377 assert!(
378 *range.start() <= tip_height,
379 "Cannot clamp range starting above tip height"
380 );
381
382 let start = *range.start();
383 let end = min(*range.end(), tip_height);
384 start..=end
385}
386
387pub async fn on_valid_value_response<Ctx>(
388 co: Co<Ctx>,
389 state: &mut State<Ctx>,
390 metrics: &Metrics,
391 request_id: OutboundRequestId,
392 peer_id: PeerId,
393 response: ValueResponse<Ctx>,
394) -> Result<(), Error<Ctx>>
395where
396 Ctx: Context,
397{
398 let start = response.start_height;
399 debug!(start = %start, num_values = %response.values.len(), %peer_id, "Received response from peer");
400
401 if let Some(response_time) = metrics.value_response_received(start.as_u64()) {
402 state.peer_scorer.update_score_with_metrics(
403 peer_id,
404 SyncResult::Success(response_time),
405 &metrics.scoring,
406 );
407 }
408
409 let values_count = response.values.len();
410
411 perform!(
413 co,
414 Effect::ProcessValueResponse(peer_id, request_id.clone(), response, Default::default())
415 );
416
417 if let Some((requested_range, stored_peer_id)) = state.pending_requests.get(&request_id) {
419 if stored_peer_id != &peer_id {
420 error!(
423 %request_id, peer.actual = %peer_id, peer.expected = %stored_peer_id,
424 "Received response from different peer than expected"
425 );
426 return on_invalid_value_response(co, state, metrics, request_id, peer_id).await;
427 }
428
429 let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1;
430
431 if values_count < range_len as usize {
432 let new_start = requested_range.start().increment_by(values_count as u64);
441
442 let end = *requested_range.end();
443
444 if values_count == 0 {
445 error!(%request_id, %peer_id, "Received response contains no values");
446 } else {
447 let updated_range =
450 *requested_range.start()..=new_start.decrement().unwrap_or_default();
451
452 state.update_request(request_id, peer_id, updated_range);
453 }
454
455 let new_range = new_start..=end;
457 request_values_range(co, state, metrics, new_range).await?;
458 }
459 }
460
461 Ok(())
462}
463
464pub async fn on_invalid_value_response<Ctx>(
465 co: Co<Ctx>,
466 state: &mut State<Ctx>,
467 metrics: &Metrics,
468 request_id: OutboundRequestId,
469 peer_id: PeerId,
470) -> Result<(), Error<Ctx>>
471where
472 Ctx: Context,
473{
474 debug!(%request_id, %peer_id, "Received invalid response");
475
476 state.peer_scorer.update_score(peer_id, SyncResult::Failure);
477
478 re_request_values_from_peer_except(co, state, metrics, request_id, Some(peer_id)).await?;
481
482 Ok(())
483}
484
485pub async fn on_got_decided_values<Ctx>(
486 co: Co<Ctx>,
487 _state: &mut State<Ctx>,
488 metrics: &Metrics,
489 request_id: InboundRequestId,
490 range: RangeInclusive<Ctx::Height>,
491 values: Vec<RawDecidedValue<Ctx>>,
492) -> Result<(), Error<Ctx>>
493where
494 Ctx: Context,
495{
496 info!(%request_id, range = %DisplayRange(&range), "Received {} values from host", values.len());
497
498 let start = range.start();
499 let end = range.end();
500
501 let batch_size = end.as_u64() - start.as_u64() + 1;
503 if batch_size != values.len() as u64 {
504 warn!(
505 %request_id,
506 "Received {} values from host, expected {batch_size}",
507 values.len()
508 )
509 }
510
511 let mut height = *start;
513 for value in &values {
514 if value.certificate.height != height {
515 error!(
516 %request_id,
517 "Received from host value for height {}, expected for height {height}",
518 value.certificate.height
519 );
520 }
521 height = height.increment();
522 }
523
524 debug!(%request_id, range = %DisplayRange(&range), "Sending response to peer");
525 perform!(
526 co,
527 Effect::SendValueResponse(
528 request_id,
529 ValueResponse::new(*start, values),
530 Default::default()
531 )
532 );
533
534 metrics.value_response_sent(start.as_u64());
535
536 Ok(())
537}
538
539pub async fn on_sync_request_timed_out<Ctx>(
540 co: Co<Ctx>,
541 state: &mut State<Ctx>,
542 metrics: &Metrics,
543 request_id: OutboundRequestId,
544 peer_id: PeerId,
545 request: Request<Ctx>,
546) -> Result<(), Error<Ctx>>
547where
548 Ctx: Context,
549{
550 match request {
551 Request::ValueRequest(value_request) => {
552 info!(%peer_id, range = %DisplayRange(&value_request.range), "Sync request timed out");
553
554 state.peer_scorer.update_score(peer_id, SyncResult::Timeout);
555
556 metrics.value_request_timed_out(value_request.range.start().as_u64());
557
558 re_request_values_from_peer_except(co, state, metrics, request_id, Some(peer_id))
559 .await?;
560 }
561 };
562
563 Ok(())
564}
565
566async fn on_invalid_value<Ctx>(
568 co: Co<Ctx>,
569 state: &mut State<Ctx>,
570 metrics: &Metrics,
571 peer_id: PeerId,
572 height: Ctx::Height,
573) -> Result<(), Error<Ctx>>
574where
575 Ctx: Context,
576{
577 error!(%peer_id, %height, "Received invalid value");
578
579 state.peer_scorer.update_score(peer_id, SyncResult::Failure);
580
581 if let Some((request_id, stored_peer_id)) = state.get_request_id_by(height) {
582 if stored_peer_id != peer_id {
583 warn!(
584 %request_id, peer.actual = %peer_id, peer.expected = %stored_peer_id,
585 "Received response from different peer than expected"
586 );
587 }
588 re_request_values_from_peer_except(co, state, metrics, request_id, Some(peer_id)).await?;
589 } else {
590 error!(%peer_id, %height, "Received height of invalid value for unknown request");
591 }
592
593 Ok(())
594}
595
596async fn on_value_processing_error<Ctx>(
597 co: Co<Ctx>,
598 state: &mut State<Ctx>,
599 metrics: &Metrics,
600 peer_id: PeerId,
601 height: Ctx::Height,
602) -> Result<(), Error<Ctx>>
603where
604 Ctx: Context,
605{
606 error!(%peer_id, %height, "Error while processing value");
607
608 if let Some((request_id, _)) = state.get_request_id_by(height) {
612 re_request_values_from_peer_except(co, state, metrics, request_id, None).await?;
613 } else {
614 error!(%peer_id, %height, "Received height of invalid value for unknown request");
615 }
616
617 Ok(())
618}
619
620async fn request_values<Ctx>(
622 co: Co<Ctx>,
623 state: &mut State<Ctx>,
624 metrics: &Metrics,
625) -> Result<(), Error<Ctx>>
626where
627 Ctx: Context,
628{
629 let max_parallel_requests = state.max_parallel_requests();
630
631 if state.pending_requests.len() >= max_parallel_requests {
632 info!(
633 max_parallel_requests,
634 pending_requests = state.pending_requests.len(),
635 "Maximum number of parallel requests reached, skipping request for values"
636 );
637
638 return Ok(());
639 };
640
641 while state.pending_requests.len() < max_parallel_requests {
642 let initial_height = state.sync_height;
644 let range = find_next_uncovered_range_from::<Ctx>(
645 initial_height,
646 state.config.batch_size as u64,
647 &state.pending_requests,
648 );
649
650 let Some((peer, range)) = state.random_peer_with(&range) else {
652 debug!("No peer to request sync from");
653 break;
655 };
656
657 send_and_track_request_to_peer(&co, state, metrics, peer, range).await?;
658 }
659
660 Ok(())
661}
662
663async fn request_values_range<Ctx>(
666 co: Co<Ctx>,
667 state: &mut State<Ctx>,
668 metrics: &Metrics,
669 range: RangeInclusive<Ctx::Height>,
670) -> Result<(), Error<Ctx>>
671where
672 Ctx: Context,
673{
674 let max_parallel_requests = state.max_parallel_requests();
680
681 if state.pending_requests.len() >= max_parallel_requests {
682 info!(
683 %max_parallel_requests,
684 pending_requests = %state.pending_requests.len(),
685 "Maximum number of pending requests reached when re-requesting a partial range of values"
686 );
687 };
688
689 let Some((peer, range)) = state.random_peer_with(&range) else {
691 debug!(range = %DisplayRange(&range), "No peer to request sync from");
693 return Ok(());
694 };
695
696 send_and_track_request_to_peer(&co, state, metrics, peer, range).await?;
697
698 Ok(())
699}
700
701async fn send_and_track_request_to_peer<Ctx>(
702 co: &Co<Ctx>,
703 state: &mut State<Ctx>,
704 metrics: &Metrics,
705 peer: PeerId,
706 range: RangeInclusive<<Ctx as Context>::Height>,
707) -> Result<(), Error<Ctx>>
708where
709 Ctx: Context,
710{
711 let Some((request_id, final_range)) =
713 send_request_to_peer(co, state, metrics, range, peer).await?
714 else {
715 return Ok(()); };
717
718 state
720 .pending_requests
721 .insert(request_id, (final_range.clone(), peer));
722
723 let next_sync_base = final_range.end().increment();
725 state.sync_height = find_next_uncovered_height::<Ctx>(next_sync_base, &state.pending_requests);
726
727 Ok(())
728}
729
730async fn send_request_to_peer<Ctx>(
733 co: &Co<Ctx>,
734 state: &mut State<Ctx>,
735 metrics: &Metrics,
736 range: RangeInclusive<Ctx::Height>,
737 peer: PeerId,
738) -> Result<Option<(OutboundRequestId, RangeInclusive<Ctx::Height>)>, Error<Ctx>>
739where
740 Ctx: Context,
741{
742 if range.is_empty() {
743 debug!(%peer, "Range is empty, skipping request");
744 return Ok(None);
745 }
746
747 let range = state.trim_validated_heights(&range);
750
751 if range.is_empty() {
752 warn!(
753 range = %DisplayRange(&range), %peer,
754 "All values in range have been validated, skipping request"
755 );
756
757 return Ok(None);
758 }
759
760 info!(range = %DisplayRange(&range), %peer, "Requesting sync from peer");
761
762 let Some(request_id) = perform!(
764 co,
765 Effect::SendValueRequest(peer, ValueRequest::new(range.clone()), Default::default()),
766 Resume::ValueRequestId(id) => id,
767 ) else {
768 warn!(range = %DisplayRange(&range), %peer, "Failed to send sync request to peer");
769 return Ok(None);
770 };
771
772 metrics.value_request_sent(range.start().as_u64());
773 debug!(%request_id, range = %DisplayRange(&range), %peer, "Sent sync request to peer");
774
775 Ok(Some((request_id, range)))
776}
777
778async fn re_request_values_from_peer_except<Ctx>(
781 co: Co<Ctx>,
782 state: &mut State<Ctx>,
783 metrics: &Metrics,
784 request_id: OutboundRequestId,
785 except_peer_id: Option<PeerId>,
786) -> Result<(), Error<Ctx>>
787where
788 Ctx: Context,
789{
790 info!(%request_id, except_peer_id = ?except_peer_id, "Re-requesting values from peer");
791
792 let Some((range, stored_peer_id)) = state.pending_requests.remove(&request_id.clone()) else {
793 warn!(%request_id, "Unknown request ID when re-requesting values");
794 return Ok(());
795 };
796
797 let except_peer_id = match except_peer_id {
798 Some(peer_id) if stored_peer_id == peer_id => Some(peer_id),
799 Some(peer_id) => {
800 warn!(
801 %request_id,
802 peer.actual = %peer_id,
803 peer.expected = %stored_peer_id,
804 "Received response from different peer than expected"
805 );
806
807 Some(stored_peer_id)
808 }
809 None => None,
810 };
811
812 let Some((peer, peer_range)) = state.random_peer_with_except(&range, except_peer_id) else {
813 debug!("No peer to re-request sync from");
814 state.sync_height = min(state.sync_height, *range.start());
816 return Ok(());
817 };
818
819 send_and_track_request_to_peer(&co, state, metrics, peer, peer_range).await?;
820
821 Ok(())
822}
823
824fn find_next_uncovered_range_from<Ctx>(
837 initial_height: Ctx::Height,
838 max_range_size: u64,
839 pending_requests: &BTreeMap<OutboundRequestId, (RangeInclusive<Ctx::Height>, PeerId)>,
840) -> RangeInclusive<Ctx::Height>
841where
842 Ctx: Context,
843{
844 let max_batch_size = max(1, max_range_size);
845
846 let next_range = pending_requests
848 .values()
849 .map(|(range, _)| range)
850 .filter(|range| *range.end() >= initial_height)
851 .min_by_key(|range| range.start());
852
853 let mut end_height = initial_height.increment_by(max_batch_size - 1);
855
856 if let Some(range) = next_range {
858 if range.contains(&initial_height) {
860 panic!(
861 "Bug: initial_height {} is already covered by a pending request. This should never happen.",
862 initial_height.as_u64()
863 );
864 }
865
866 let boundary_end = range
868 .start()
869 .decrement()
870 .expect("range.start() should be decrementable since it's > initial_height");
871 end_height = min(end_height, boundary_end);
872 }
873
874 initial_height..=end_height
875}
876
877fn find_next_uncovered_height<Ctx>(
879 starting_height: Ctx::Height,
880 pending_requests: &BTreeMap<OutboundRequestId, (RangeInclusive<Ctx::Height>, PeerId)>,
881) -> Ctx::Height
882where
883 Ctx: Context,
884{
885 let mut next_height = starting_height;
886 while let Some((covered_range, _)) = pending_requests
887 .values()
888 .find(|(r, _)| r.contains(&next_height))
889 {
890 next_height = covered_range.end().increment();
891 }
892 next_height
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898 use arc_malachitebft_test::{Height, TestContext};
899 use std::collections::BTreeMap;
900
901 type TestPendingRequests = BTreeMap<OutboundRequestId, (RangeInclusive<Height>, PeerId)>;
902
903 struct RangeTestCase {
906 name: &'static str,
907 initial_height: u64,
908 max_size: u64,
909 pending_ranges: &'static [(u64, u64)], expected_start: u64,
911 expected_end: u64,
912 }
913
914 struct PanicTestCase {
915 name: &'static str,
916 initial_height: u64,
917 max_size: u64,
918 pending_ranges: &'static [(u64, u64)], expected_panic_msg: &'static str,
920 }
921
922 struct HeightTestCase {
923 name: &'static str,
924 initial_height: u64,
925 pending_ranges: &'static [(u64, u64)], expected_height: u64,
927 }
928
929 #[test]
932 fn test_find_next_uncovered_range_from_table() {
933 let test_cases = [
934 RangeTestCase {
935 name: "no pending requests",
936 initial_height: 10,
937 max_size: 5,
938 pending_ranges: &[],
939 expected_start: 10,
940 expected_end: 14,
941 },
942 RangeTestCase {
943 name: "max size one",
944 initial_height: 10,
945 max_size: 1,
946 pending_ranges: &[],
947 expected_start: 10,
948 expected_end: 10,
949 },
950 RangeTestCase {
951 name: "with blocking request",
952 initial_height: 10,
953 max_size: 5,
954 pending_ranges: &[(12, 15)],
955 expected_start: 10,
956 expected_end: 11,
957 },
958 RangeTestCase {
959 name: "zero max size becomes one",
960 initial_height: 10,
961 max_size: 0, pending_ranges: &[],
963 expected_start: 10,
964 expected_end: 10,
965 },
966 RangeTestCase {
967 name: "range starts immediately after",
968 initial_height: 15,
969 max_size: 5,
970 pending_ranges: &[(16, 20)],
971 expected_start: 15,
972 expected_end: 15, },
974 RangeTestCase {
975 name: "height zero with range starting at one",
976 initial_height: 0,
977 max_size: 3,
978 pending_ranges: &[(1, 5)],
979 expected_start: 0,
980 expected_end: 0, },
982 RangeTestCase {
983 name: "sync height just at range end",
984 initial_height: 11,
985 max_size: 4,
986 pending_ranges: &[(5, 10)],
987 expected_start: 11,
988 expected_end: 14, },
990 RangeTestCase {
991 name: "fill gap between ranges",
992 initial_height: 12,
993 max_size: 6,
994 pending_ranges: &[(5, 10), (20, 25)],
995 expected_start: 12,
996 expected_end: 17, },
998 ];
999
1000 for case in test_cases {
1001 let mut pending_requests = TestPendingRequests::new();
1002
1003 for (i, &(start, end)) in case.pending_ranges.iter().enumerate() {
1005 let peer = PeerId::random();
1006 pending_requests.insert(
1007 OutboundRequestId::new(format!("req{}", i + 1)),
1008 (Height::new(start)..=Height::new(end), peer),
1009 );
1010 }
1011
1012 let result = find_next_uncovered_range_from::<TestContext>(
1013 Height::new(case.initial_height),
1014 case.max_size,
1015 &pending_requests,
1016 );
1017
1018 assert_eq!(
1019 result,
1020 Height::new(case.expected_start)..=Height::new(case.expected_end),
1021 "Test case '{}' failed",
1022 case.name
1023 );
1024 }
1025 }
1026
1027 #[test]
1030 fn test_find_next_uncovered_range_from_panic_cases() {
1031 let test_cases = [
1032 PanicTestCase {
1033 name: "sync height covered",
1034 initial_height: 12,
1035 max_size: 3,
1036 pending_ranges: &[(10, 15)],
1037 expected_panic_msg:
1038 "Bug: initial_height 12 is already covered by a pending request",
1039 },
1040 PanicTestCase {
1041 name: "initial height equals range start",
1042 initial_height: 15,
1043 max_size: 5,
1044 pending_ranges: &[(15, 20)],
1045 expected_panic_msg:
1046 "Bug: initial_height 15 is already covered by a pending request",
1047 },
1048 PanicTestCase {
1049 name: "sync height equals range end",
1050 initial_height: 15,
1051 max_size: 3,
1052 pending_ranges: &[(10, 15)],
1053 expected_panic_msg:
1054 "Bug: initial_height 15 is already covered by a pending request",
1055 },
1056 PanicTestCase {
1057 name: "multiple consecutive blocks",
1058 initial_height: 16,
1059 max_size: 3,
1060 pending_ranges: &[(10, 15), (16, 20)],
1061 expected_panic_msg:
1062 "Bug: initial_height 16 is already covered by a pending request",
1063 },
1064 PanicTestCase {
1065 name: "sync height zero with range starting at zero",
1066 initial_height: 0,
1067 max_size: 3,
1068 pending_ranges: &[(0, 5)],
1069 expected_panic_msg: "Bug: initial_height 0 is already covered by a pending request",
1070 },
1071 ];
1072
1073 for case in test_cases {
1074 let mut pending_requests = TestPendingRequests::new();
1075
1076 for (i, &(start, end)) in case.pending_ranges.iter().enumerate() {
1078 let peer = PeerId::random();
1079 pending_requests.insert(
1080 OutboundRequestId::new(format!("req{}", i + 1)),
1081 (Height::new(start)..=Height::new(end), peer),
1082 );
1083 }
1084
1085 let result = std::panic::catch_unwind(|| {
1086 find_next_uncovered_range_from::<TestContext>(
1087 Height::new(case.initial_height),
1088 case.max_size,
1089 &pending_requests,
1090 )
1091 });
1092
1093 assert!(
1094 result.is_err(),
1095 "Test case '{}' should have panicked but didn't",
1096 case.name
1097 );
1098
1099 if let Err(panic_value) = result {
1100 if let Some(panic_msg) = panic_value.downcast_ref::<String>() {
1101 assert!(
1102 panic_msg.contains(case.expected_panic_msg),
1103 "Test case '{}' panicked with wrong message. Expected: '{}', Got: '{}'",
1104 case.name,
1105 case.expected_panic_msg,
1106 panic_msg
1107 );
1108 } else if let Some(panic_msg) = panic_value.downcast_ref::<&str>() {
1109 assert!(
1110 panic_msg.contains(case.expected_panic_msg),
1111 "Test case '{}' panicked with wrong message. Expected: '{}', Got: '{}'",
1112 case.name,
1113 case.expected_panic_msg,
1114 panic_msg
1115 );
1116 }
1117 }
1118 }
1119 }
1120
1121 #[test]
1124 fn test_find_next_uncovered_height_table() {
1125 let test_cases = [
1126 HeightTestCase {
1127 name: "no pending requests",
1128 initial_height: 10,
1129 pending_ranges: &[],
1130 expected_height: 10,
1131 },
1132 HeightTestCase {
1133 name: "starting height covered",
1134 initial_height: 12,
1135 pending_ranges: &[(10, 15)],
1136 expected_height: 16, },
1138 HeightTestCase {
1139 name: "starting height match request start",
1140 initial_height: 10,
1141 pending_ranges: &[(10, 15)],
1142 expected_height: 16, },
1144 HeightTestCase {
1145 name: "starting height match request end",
1146 initial_height: 15,
1147 pending_ranges: &[(10, 15)],
1148 expected_height: 16, },
1150 HeightTestCase {
1151 name: "starting height just before request start",
1152 initial_height: 9,
1153 pending_ranges: &[(10, 15)],
1154 expected_height: 9, },
1156 HeightTestCase {
1157 name: "multiple consecutive ranges",
1158 initial_height: 10,
1159 pending_ranges: &[(10, 15), (16, 20)],
1160 expected_height: 21, },
1162 HeightTestCase {
1163 name: "multiple consecutive ranges with a gap",
1164 initial_height: 10,
1165 pending_ranges: &[(10, 15), (16, 20), (24, 30)],
1166 expected_height: 21, },
1168 HeightTestCase {
1169 name: "starting height covered multiple",
1170 initial_height: 12,
1171 pending_ranges: &[(10, 15), (15, 20)],
1172 expected_height: 21, },
1174 ];
1175
1176 for case in test_cases {
1177 let mut pending_requests = TestPendingRequests::new();
1178
1179 for (i, &(start, end)) in case.pending_ranges.iter().enumerate() {
1181 let peer = PeerId::random();
1182 pending_requests.insert(
1183 OutboundRequestId::new(format!("req{}", i + 1)),
1184 (Height::new(start)..=Height::new(end), peer),
1185 );
1186 }
1187
1188 let result = find_next_uncovered_height::<TestContext>(
1189 Height::new(case.initial_height),
1190 &pending_requests,
1191 );
1192
1193 assert_eq!(
1194 result,
1195 Height::new(case.expected_height),
1196 "Test case '{}' failed",
1197 case.name
1198 );
1199 }
1200 }
1201
1202 #[test]
1203 fn test_validate_request_range() {
1204 let validate = validate_request_range::<TestContext>;
1205
1206 let tip_height = Height::new(20);
1207 let batch_size = 5;
1208
1209 let range = Height::new(15)..=Height::new(19);
1211 assert!(validate(&range, tip_height, batch_size));
1212
1213 let range = Height::new(18)..=Height::new(17);
1215 assert!(!validate(&range, tip_height, batch_size));
1216
1217 let range = Height::new(21)..=Height::new(25);
1219 assert!(!validate(&range, tip_height, batch_size));
1220
1221 let range = Height::new(10)..=Height::new(16);
1223 assert!(!validate(&range, tip_height, batch_size));
1224
1225 let range = Height::new(0)..=Height::new(u64::MAX);
1227 assert!(!validate(&range, tip_height, batch_size));
1228 }
1229
1230 #[test]
1231 fn test_clamp_request_range() {
1232 let clamp = clamp_request_range::<TestContext>;
1233
1234 let tip_height = Height::new(20);
1235
1236 let range = Height::new(15)..=Height::new(18);
1238 let clamped = clamp(&range, tip_height);
1239 assert_eq!(clamped, range);
1240
1241 let range = Height::new(18)..=Height::new(25);
1243 let clamped = clamp(&range, tip_height);
1244 assert_eq!(clamped, Height::new(18)..=tip_height);
1245
1246 let range = tip_height..=Height::new(25);
1248 let clamped = clamp(&range, tip_height);
1249 assert_eq!(clamped, tip_height..=tip_height);
1250 }
1251}