Skip to main content

arc_malachitebft_sync/
handle.rs

1use std::cmp::{max, min};
2use std::collections::BTreeMap;
3use std::ops::RangeInclusive;
4
5use derive_where::derive_where;
6use tracing::{debug, error, info, warn};
7
8use malachitebft_core_types::utils::height::DisplayRange;
9use malachitebft_core_types::{Context, Height};
10
11use crate::co::Co;
12use crate::scoring::SyncResult;
13use crate::{
14    perform, Effect, Error, HeightStartType, InboundRequestId, Metrics, OutboundRequestId, PeerId,
15    RawDecidedValue, Request, Resume, State, Status, ValueRequest, ValueResponse,
16};
17
18#[derive_where(Debug)]
19pub enum Input<Ctx: Context> {
20    /// Periodical event triggering the broadcast of a status update
21    SendStatusUpdate,
22
23    /// A status update has been received from a peer
24    Status(Status<Ctx>),
25
26    /// Consensus just started a new height.
27    /// The boolean indicates whether this was a restart or a new start.
28    StartedHeight(Ctx::Height, HeightStartType),
29
30    /// Consensus just decided on a new value
31    Decided(Ctx::Height),
32
33    /// A ValueSync request has been received from a peer
34    ValueRequest(InboundRequestId, PeerId, ValueRequest<Ctx>),
35
36    /// A (possibly empty or invalid) ValueSync response has been received
37    ValueResponse(OutboundRequestId, PeerId, Option<ValueResponse<Ctx>>),
38
39    /// Got a response from the application to our `GetDecidedValues` request
40    GotDecidedValues(
41        InboundRequestId,
42        RangeInclusive<Ctx::Height>,
43        Vec<RawDecidedValue<Ctx>>,
44    ),
45
46    /// A request for a value timed out
47    SyncRequestTimedOut(OutboundRequestId, PeerId, Request<Ctx>),
48
49    /// We received an invalid value (either certificate or value)
50    InvalidValue(PeerId, Ctx::Height),
51
52    /// An error occurred while processing a value
53    ValueProcessingError(PeerId, Ctx::Height),
54}
55
56pub async fn handle<Ctx>(
57    co: Co<Ctx>,
58    state: &mut State<Ctx>,
59    metrics: &Metrics,
60    input: Input<Ctx>,
61) -> Result<(), Error<Ctx>>
62where
63    Ctx: Context,
64{
65    match input {
66        Input::SendStatusUpdate => on_send_status_update(co, state, metrics).await,
67
68        Input::Status(status) => on_status(co, state, metrics, status).await,
69
70        Input::StartedHeight(height, restart) => {
71            on_started_height(co, state, metrics, height, restart).await
72        }
73
74        Input::Decided(height) => on_decided(state, metrics, height).await,
75
76        Input::ValueRequest(request_id, peer_id, request) => {
77            on_value_request(co, state, metrics, request_id, peer_id, request).await
78        }
79
80        Input::ValueResponse(request_id, peer_id, Some(response)) => {
81            on_value_response(co, state, metrics, request_id, peer_id, response).await
82        }
83
84        Input::ValueResponse(request_id, peer_id, None) => {
85            on_invalid_value_response(co, state, metrics, request_id, peer_id).await
86        }
87
88        Input::GotDecidedValues(request_id, range, values) => {
89            on_got_decided_values(co, state, metrics, request_id, range, values).await
90        }
91
92        Input::SyncRequestTimedOut(request_id, peer_id, request) => {
93            on_sync_request_timed_out(co, state, metrics, request_id, peer_id, request).await
94        }
95
96        Input::InvalidValue(peer, value) => on_invalid_value(co, state, metrics, peer, value).await,
97
98        Input::ValueProcessingError(peer, height) => {
99            on_value_processing_error(co, state, metrics, peer, height).await
100        }
101    }
102}
103
104async fn on_value_response<Ctx>(
105    co: Co<Ctx>,
106    state: &mut State<Ctx>,
107    metrics: &Metrics,
108    request_id: OutboundRequestId,
109    peer_id: PeerId,
110    response: ValueResponse<Ctx>,
111) -> Result<(), Error<Ctx>>
112where
113    Ctx: Context,
114{
115    let start = response.start_height;
116    let end = response.end_height().unwrap_or(start);
117    let range_len = end.as_u64() - start.as_u64() + 1;
118
119    // Check if the response is valid. A valid response starts at the
120    // requested start height, has at least one value, and no more than
121    // the requested range.
122    let Some((requested_range, stored_peer_id)) = state.pending_requests.get(&request_id) else {
123        warn!(%request_id, %peer_id, "Received response for unknown request ID");
124        return Ok(());
125    };
126
127    if stored_peer_id != &peer_id {
128        warn!(
129            %request_id, actual_peer = %peer_id, expected_peer = %stored_peer_id,
130            "Received response from different peer than expected"
131        );
132
133        return on_invalid_value_response(co, state, metrics, request_id, peer_id).await;
134    }
135
136    let is_valid = start.as_u64() == requested_range.start().as_u64()
137        && start.as_u64() <= end.as_u64()
138        && end.as_u64() <= requested_range.end().as_u64()
139        && response.values.len() as u64 == range_len;
140
141    if !is_valid {
142        warn!(
143            %request_id, %peer_id,
144            "Received request for wrong range of heights: expected {}..={} ({} values), got {}..={} ({} values)",
145            requested_range.start().as_u64(), requested_range.end().as_u64(), range_len,
146            start.as_u64(), end.as_u64(), response.values.len() as u64
147        );
148
149        return on_invalid_value_response(co, state, metrics, request_id, peer_id).await;
150    }
151
152    on_valid_value_response(co, state, metrics, request_id, peer_id, response).await
153}
154
155pub async fn on_send_status_update<Ctx>(
156    co: Co<Ctx>,
157    state: &mut State<Ctx>,
158    _metrics: &Metrics,
159) -> Result<(), Error<Ctx>>
160where
161    Ctx: Context,
162{
163    debug!(tip_height = %state.tip_height, "Broadcasting status");
164
165    perform!(
166        co,
167        Effect::BroadcastStatus(state.tip_height, Default::default())
168    );
169
170    if let Some(inactive_threshold) = state.config.inactive_threshold {
171        // If we are at or above the inactive threshold, we can prune inactive peers.
172        state
173            .peer_scorer
174            .reset_inactive_peers_scores(inactive_threshold);
175    }
176
177    debug!("Peer scores: {:?}", state.peer_scorer.get_scores());
178
179    Ok(())
180}
181
182pub async fn on_status<Ctx>(
183    co: Co<Ctx>,
184    state: &mut State<Ctx>,
185    metrics: &Metrics,
186    status: Status<Ctx>,
187) -> Result<(), Error<Ctx>>
188where
189    Ctx: Context,
190{
191    let peer_id = status.peer_id;
192    let peer_height = status.tip_height;
193
194    debug!(%peer_id, %peer_height, "Received peer status");
195
196    state.update_status(status);
197    metrics.status_received(state.peers.len() as u64);
198
199    if !state.started {
200        // Consensus has not started yet, no need to sync (yet).
201        return Ok(());
202    }
203
204    if peer_height >= state.sync_height {
205        info!(
206            tip_height = %state.tip_height,
207            sync_height = %state.sync_height,
208            peer_height = %peer_height,
209            "SYNC REQUIRED: Falling behind"
210        );
211
212        // We are lagging behind on one of our peers at least.
213        // Request values from any peer already at or above that peer's height.
214        request_values(co, state, metrics).await?;
215    }
216
217    Ok(())
218}
219
220pub async fn on_started_height<Ctx>(
221    co: Co<Ctx>,
222    state: &mut State<Ctx>,
223    metrics: &Metrics,
224    height: Ctx::Height,
225    start_type: HeightStartType,
226) -> Result<(), Error<Ctx>>
227where
228    Ctx: Context,
229{
230    debug!(%height, is_restart = %start_type.is_restart(), "Consensus started new height");
231
232    state.started = true;
233    state.consensus_height = height;
234
235    // The tip is the last decided value.
236    state.tip_height = height.decrement().unwrap_or_default();
237
238    // Garbage collect fully-validated requests.
239    state.prune_pending_requests();
240
241    if start_type.is_restart() {
242        // Consensus is retrying the height, so we should sync starting from it.
243        state.sync_height = height;
244        // Clear pending requests, as we are restarting the height.
245        state.pending_requests.clear();
246    } else {
247        // If consensus is voting on a height that is currently being synced from a peer, do not update the sync height.
248        state.sync_height = max(state.sync_height, height);
249    }
250
251    // Trigger potential requests if possible.
252    request_values(co, state, metrics).await?;
253
254    Ok(())
255}
256
257pub async fn on_decided<Ctx>(
258    state: &mut State<Ctx>,
259    _metrics: &Metrics,
260    height: Ctx::Height,
261) -> Result<(), Error<Ctx>>
262where
263    Ctx: Context,
264{
265    debug!(%height, "Consensus decided on new value");
266
267    state.tip_height = height;
268
269    // Garbage collect pending requests for heights up to the new tip.
270    state.prune_pending_requests();
271
272    // The next height to sync should always be higher than the tip.
273    if state.sync_height == state.tip_height {
274        state.sync_height = state.sync_height.increment();
275    }
276
277    Ok(())
278}
279
280#[tracing::instrument(
281    name = "on_value_request",
282    skip_all,
283    fields(
284        peer_id = %peer_id,
285        request_id = %request_id,
286        range = %DisplayRange(&request.range)
287    )
288)]
289pub async fn on_value_request<Ctx>(
290    co: Co<Ctx>,
291    state: &mut State<Ctx>,
292    metrics: &Metrics,
293    request_id: InboundRequestId,
294    peer_id: PeerId,
295    request: ValueRequest<Ctx>,
296) -> Result<(), Error<Ctx>>
297where
298    Ctx: Context,
299{
300    debug!("Received request for values");
301
302    if !validate_request_range::<Ctx>(&request.range, state.tip_height, state.config.batch_size) {
303        debug!("Sending empty response to peer");
304
305        perform!(
306            co,
307            Effect::SendValueResponse(
308                request_id.clone(),
309                ValueResponse::new(*request.range.start(), vec![]),
310                Default::default()
311            )
312        );
313
314        return Ok(());
315    }
316
317    metrics.value_request_received(request.range.start().as_u64());
318
319    let range = clamp_request_range::<Ctx>(&request.range, state.tip_height);
320
321    if range != request.range {
322        debug!(
323            requested = %DisplayRange(&request.range),
324            clamped = %DisplayRange(&range),
325            "Clamped request range to our tip height"
326        );
327    }
328
329    perform!(
330        co,
331        Effect::GetDecidedValues(request_id, range, Default::default())
332    );
333
334    Ok(())
335}
336
337fn validate_request_range<Ctx>(
338    range: &RangeInclusive<Ctx::Height>,
339    tip_height: Ctx::Height,
340    batch_size: usize,
341) -> bool
342where
343    Ctx: Context,
344{
345    if range.is_empty() {
346        debug!("Received request for empty range of values");
347        return false;
348    }
349
350    if range.start() > range.end() {
351        debug!("Received request for invalid range of values");
352        return false;
353    }
354
355    if range.start() > &tip_height {
356        debug!("Received request for values beyond our tip height {tip_height}");
357        return false;
358    }
359
360    let len = (range.end().as_u64() - range.start().as_u64()).saturating_add(1) as usize;
361    if len > batch_size {
362        warn!("Received request for too many values: requested {len}, max is {batch_size}");
363        return false;
364    }
365
366    true
367}
368
369fn clamp_request_range<Ctx>(
370    range: &RangeInclusive<Ctx::Height>,
371    tip_height: Ctx::Height,
372) -> RangeInclusive<Ctx::Height>
373where
374    Ctx: Context,
375{
376    assert!(!range.is_empty(), "Cannot clamp an empty range");
377    assert!(
378        *range.start() <= tip_height,
379        "Cannot clamp range starting above tip height"
380    );
381
382    let start = *range.start();
383    let end = min(*range.end(), tip_height);
384    start..=end
385}
386
387pub async fn on_valid_value_response<Ctx>(
388    co: Co<Ctx>,
389    state: &mut State<Ctx>,
390    metrics: &Metrics,
391    request_id: OutboundRequestId,
392    peer_id: PeerId,
393    response: ValueResponse<Ctx>,
394) -> Result<(), Error<Ctx>>
395where
396    Ctx: Context,
397{
398    let start = response.start_height;
399    debug!(start = %start, num_values = %response.values.len(), %peer_id, "Received response from peer");
400
401    if let Some(response_time) = metrics.value_response_received(start.as_u64()) {
402        state.peer_scorer.update_score_with_metrics(
403            peer_id,
404            SyncResult::Success(response_time),
405            &metrics.scoring,
406        );
407    }
408
409    let values_count = response.values.len();
410
411    // Tell consensus to process the response.
412    perform!(
413        co,
414        Effect::ProcessValueResponse(peer_id, request_id.clone(), response, Default::default())
415    );
416
417    // If the response contains a prefix of the requested values, re-request the remaining values.
418    if let Some((requested_range, stored_peer_id)) = state.pending_requests.get(&request_id) {
419        if stored_peer_id != &peer_id {
420            // Defensive check: This should never happen because this check is already performed in
421            // the handler of `Input::ValueResponse`.
422            error!(
423                %request_id, peer.actual = %peer_id, peer.expected = %stored_peer_id,
424                "Received response from different peer than expected"
425            );
426            return on_invalid_value_response(co, state, metrics, request_id, peer_id).await;
427        }
428
429        let range_len = requested_range.end().as_u64() - requested_range.start().as_u64() + 1;
430
431        if values_count < range_len as usize {
432            // NOTE: We cannot simply call `re_request_values_from_peer_except` here.
433            // Although we received some values from the peer, these values have not yet been processed
434            // by the consensus engine. If we called `re_request_values_from_peer_except`, we would
435            // end up re-requesting the entire original range (including values we already received),
436            // causing the syncing peer to repeatedly send multiple requests until the already-received
437            // values are fully processed.
438            // To tackle this, we first update the current pending request with the range of values
439            // it provides we received, and then issue a new request with the remaining values.
440            let new_start = requested_range.start().increment_by(values_count as u64);
441
442            let end = *requested_range.end();
443
444            if values_count == 0 {
445                error!(%request_id, %peer_id, "Received response contains no values");
446            } else {
447                // The response of this request only provided `response.values.len()` values,
448                // so update the pending request accordingly
449                let updated_range =
450                    *requested_range.start()..=new_start.decrement().unwrap_or_default();
451
452                state.update_request(request_id, peer_id, updated_range);
453            }
454
455            // Issue a new request to any peer, not necessarily the same one, for the remaining values
456            let new_range = new_start..=end;
457            request_values_range(co, state, metrics, new_range).await?;
458        }
459    }
460
461    Ok(())
462}
463
464pub async fn on_invalid_value_response<Ctx>(
465    co: Co<Ctx>,
466    state: &mut State<Ctx>,
467    metrics: &Metrics,
468    request_id: OutboundRequestId,
469    peer_id: PeerId,
470) -> Result<(), Error<Ctx>>
471where
472    Ctx: Context,
473{
474    debug!(%request_id, %peer_id, "Received invalid response");
475
476    state.peer_scorer.update_score(peer_id, SyncResult::Failure);
477
478    // We do not trust the response, so we remove the pending request and re-request
479    // the whole range from another peer.
480    re_request_values_from_peer_except(co, state, metrics, request_id, Some(peer_id)).await?;
481
482    Ok(())
483}
484
485pub async fn on_got_decided_values<Ctx>(
486    co: Co<Ctx>,
487    _state: &mut State<Ctx>,
488    metrics: &Metrics,
489    request_id: InboundRequestId,
490    range: RangeInclusive<Ctx::Height>,
491    values: Vec<RawDecidedValue<Ctx>>,
492) -> Result<(), Error<Ctx>>
493where
494    Ctx: Context,
495{
496    info!(%request_id, range = %DisplayRange(&range), "Received {} values from host", values.len());
497
498    let start = range.start();
499    let end = range.end();
500
501    // Validate response from host
502    let batch_size = end.as_u64() - start.as_u64() + 1;
503    if batch_size != values.len() as u64 {
504        warn!(
505            %request_id,
506            "Received {} values from host, expected {batch_size}",
507            values.len()
508        )
509    }
510
511    // Validate the height of each received value
512    let mut height = *start;
513    for value in &values {
514        if value.certificate.height != height {
515            error!(
516                %request_id,
517                "Received from host value for height {}, expected for height {height}",
518                value.certificate.height
519            );
520        }
521        height = height.increment();
522    }
523
524    debug!(%request_id, range = %DisplayRange(&range), "Sending response to peer");
525    perform!(
526        co,
527        Effect::SendValueResponse(
528            request_id,
529            ValueResponse::new(*start, values),
530            Default::default()
531        )
532    );
533
534    metrics.value_response_sent(start.as_u64());
535
536    Ok(())
537}
538
539pub async fn on_sync_request_timed_out<Ctx>(
540    co: Co<Ctx>,
541    state: &mut State<Ctx>,
542    metrics: &Metrics,
543    request_id: OutboundRequestId,
544    peer_id: PeerId,
545    request: Request<Ctx>,
546) -> Result<(), Error<Ctx>>
547where
548    Ctx: Context,
549{
550    match request {
551        Request::ValueRequest(value_request) => {
552            info!(%peer_id, range = %DisplayRange(&value_request.range), "Sync request timed out");
553
554            state.peer_scorer.update_score(peer_id, SyncResult::Timeout);
555
556            metrics.value_request_timed_out(value_request.range.start().as_u64());
557
558            re_request_values_from_peer_except(co, state, metrics, request_id, Some(peer_id))
559                .await?;
560        }
561    };
562
563    Ok(())
564}
565
566// When receiving an invalid value, re-request the whole batch from another peer.
567async fn on_invalid_value<Ctx>(
568    co: Co<Ctx>,
569    state: &mut State<Ctx>,
570    metrics: &Metrics,
571    peer_id: PeerId,
572    height: Ctx::Height,
573) -> Result<(), Error<Ctx>>
574where
575    Ctx: Context,
576{
577    error!(%peer_id, %height, "Received invalid value");
578
579    state.peer_scorer.update_score(peer_id, SyncResult::Failure);
580
581    if let Some((request_id, stored_peer_id)) = state.get_request_id_by(height) {
582        if stored_peer_id != peer_id {
583            warn!(
584                %request_id, peer.actual = %peer_id, peer.expected = %stored_peer_id,
585                "Received response from different peer than expected"
586            );
587        }
588        re_request_values_from_peer_except(co, state, metrics, request_id, Some(peer_id)).await?;
589    } else {
590        error!(%peer_id, %height, "Received height of invalid value for unknown request");
591    }
592
593    Ok(())
594}
595
596async fn on_value_processing_error<Ctx>(
597    co: Co<Ctx>,
598    state: &mut State<Ctx>,
599    metrics: &Metrics,
600    peer_id: PeerId,
601    height: Ctx::Height,
602) -> Result<(), Error<Ctx>>
603where
604    Ctx: Context,
605{
606    error!(%peer_id, %height, "Error while processing value");
607
608    // NOTE: We do not update the peer score here, as this is an internal error
609    //       and not a failure from the peer's side.
610
611    if let Some((request_id, _)) = state.get_request_id_by(height) {
612        re_request_values_from_peer_except(co, state, metrics, request_id, None).await?;
613    } else {
614        error!(%peer_id, %height, "Received height of invalid value for unknown request");
615    }
616
617    Ok(())
618}
619
620/// Request multiple batches of values in parallel.
621async fn request_values<Ctx>(
622    co: Co<Ctx>,
623    state: &mut State<Ctx>,
624    metrics: &Metrics,
625) -> Result<(), Error<Ctx>>
626where
627    Ctx: Context,
628{
629    let max_parallel_requests = state.max_parallel_requests();
630
631    if state.pending_requests.len() >= max_parallel_requests {
632        info!(
633            max_parallel_requests,
634            pending_requests = state.pending_requests.len(),
635            "Maximum number of parallel requests reached, skipping request for values"
636        );
637
638        return Ok(());
639    };
640
641    while state.pending_requests.len() < max_parallel_requests {
642        // Find the next uncovered range starting from current sync_height
643        let initial_height = state.sync_height;
644        let range = find_next_uncovered_range_from::<Ctx>(
645            initial_height,
646            state.config.batch_size as u64,
647            &state.pending_requests,
648        );
649
650        // Get a random peer that can provide the values in the range.
651        let Some((peer, range)) = state.random_peer_with(&range) else {
652            debug!("No peer to request sync from");
653            // No connected peer reached this height yet, we can stop syncing here.
654            break;
655        };
656
657        send_and_track_request_to_peer(&co, state, metrics, peer, range).await?;
658    }
659
660    Ok(())
661}
662
663/// Request values for this specific range from a peer.
664/// Should only be used when re-requesting a partial range of values from a peer.
665async fn request_values_range<Ctx>(
666    co: Co<Ctx>,
667    state: &mut State<Ctx>,
668    metrics: &Metrics,
669    range: RangeInclusive<Ctx::Height>,
670) -> Result<(), Error<Ctx>>
671where
672    Ctx: Context,
673{
674    // NOTE: We do not perform a `max_parallel_requests` check and return here in contrast to what is done, for
675    // example, in `request_values`. This is because `request_values_range` is only called for retrieving
676    // partial responses, which means the original request is not on the wire anymore. Nevertheless,
677    // we log here because seeing this log frequently implies that we keep getting partial responses
678    // from peers and hints to potential reconfiguration.
679    let max_parallel_requests = state.max_parallel_requests();
680
681    if state.pending_requests.len() >= max_parallel_requests {
682        info!(
683            %max_parallel_requests,
684            pending_requests = %state.pending_requests.len(),
685            "Maximum number of pending requests reached when re-requesting a partial range of values"
686        );
687    };
688
689    // Get a random peer that can provide the values in the range.
690    let Some((peer, range)) = state.random_peer_with(&range) else {
691        // No connected peer reached this height yet, we can stop syncing here.
692        debug!(range = %DisplayRange(&range), "No peer to request sync from");
693        return Ok(());
694    };
695
696    send_and_track_request_to_peer(&co, state, metrics, peer, range).await?;
697
698    Ok(())
699}
700
701async fn send_and_track_request_to_peer<Ctx>(
702    co: &Co<Ctx>,
703    state: &mut State<Ctx>,
704    metrics: &Metrics,
705    peer: PeerId,
706    range: RangeInclusive<<Ctx as Context>::Height>,
707) -> Result<(), Error<Ctx>>
708where
709    Ctx: Context,
710{
711    // Send the request
712    let Some((request_id, final_range)) =
713        send_request_to_peer(co, state, metrics, range, peer).await?
714    else {
715        return Ok(()); // Request was skipped (empty range, etc.)
716    };
717
718    // Store the pending request
719    state
720        .pending_requests
721        .insert(request_id, (final_range.clone(), peer));
722
723    // Update sync_height to the next uncovered height after this range
724    let next_sync_base = final_range.end().increment();
725    state.sync_height = find_next_uncovered_height::<Ctx>(next_sync_base, &state.pending_requests);
726
727    Ok(())
728}
729
730/// Send a value request to a peer. Returns the request_id and final range if successful.
731/// The calling function is responsible for storing the request and updating state.
732async fn send_request_to_peer<Ctx>(
733    co: &Co<Ctx>,
734    state: &mut State<Ctx>,
735    metrics: &Metrics,
736    range: RangeInclusive<Ctx::Height>,
737    peer: PeerId,
738) -> Result<Option<(OutboundRequestId, RangeInclusive<Ctx::Height>)>, Error<Ctx>>
739where
740    Ctx: Context,
741{
742    if range.is_empty() {
743        debug!(%peer, "Range is empty, skipping request");
744        return Ok(None);
745    }
746
747    // Skip over any heights in the range that are not waiting for a response
748    // (meaning that they have been validated by consensus or a peer).
749    let range = state.trim_validated_heights(&range);
750
751    if range.is_empty() {
752        warn!(
753            range = %DisplayRange(&range), %peer,
754            "All values in range have been validated, skipping request"
755        );
756
757        return Ok(None);
758    }
759
760    info!(range = %DisplayRange(&range), %peer, "Requesting sync from peer");
761
762    // Send request to peer
763    let Some(request_id) = perform!(
764        co,
765        Effect::SendValueRequest(peer, ValueRequest::new(range.clone()), Default::default()),
766        Resume::ValueRequestId(id) => id,
767    ) else {
768        warn!(range = %DisplayRange(&range), %peer, "Failed to send sync request to peer");
769        return Ok(None);
770    };
771
772    metrics.value_request_sent(range.start().as_u64());
773    debug!(%request_id, range = %DisplayRange(&range), %peer, "Sent sync request to peer");
774
775    Ok(Some((request_id, range)))
776}
777
778/// Remove the pending request and re-request the batch from another peer.
779/// If `except_peer_id` is provided, the request will be re-sent to a different peer than the one that sent the original request.
780async fn re_request_values_from_peer_except<Ctx>(
781    co: Co<Ctx>,
782    state: &mut State<Ctx>,
783    metrics: &Metrics,
784    request_id: OutboundRequestId,
785    except_peer_id: Option<PeerId>,
786) -> Result<(), Error<Ctx>>
787where
788    Ctx: Context,
789{
790    info!(%request_id, except_peer_id = ?except_peer_id, "Re-requesting values from peer");
791
792    let Some((range, stored_peer_id)) = state.pending_requests.remove(&request_id.clone()) else {
793        warn!(%request_id, "Unknown request ID when re-requesting values");
794        return Ok(());
795    };
796
797    let except_peer_id = match except_peer_id {
798        Some(peer_id) if stored_peer_id == peer_id => Some(peer_id),
799        Some(peer_id) => {
800            warn!(
801                %request_id,
802                peer.actual = %peer_id,
803                peer.expected = %stored_peer_id,
804                "Received response from different peer than expected"
805            );
806
807            Some(stored_peer_id)
808        }
809        None => None,
810    };
811
812    let Some((peer, peer_range)) = state.random_peer_with_except(&range, except_peer_id) else {
813        debug!("No peer to re-request sync from");
814        // Reset the sync height to the start of the range.
815        state.sync_height = min(state.sync_height, *range.start());
816        return Ok(());
817    };
818
819    send_and_track_request_to_peer(&co, state, metrics, peer, peer_range).await?;
820
821    Ok(())
822}
823
824/// Find the next uncovered range starting from initial_height.
825///
826/// Builds a contiguous range of the specified max_size from initial_height.
827///
828/// # Assumptions
829/// - All ranges in pending_requests are disjoint (non-overlapping)
830/// - initial_height is not covered by any pending request (maintained by caller)
831///
832/// # Panics
833/// Panics if initial_height is already covered by a pending request (indicates a bug in the logic).
834///
835/// Returns the range that should be requested.
836fn find_next_uncovered_range_from<Ctx>(
837    initial_height: Ctx::Height,
838    max_range_size: u64,
839    pending_requests: &BTreeMap<OutboundRequestId, (RangeInclusive<Ctx::Height>, PeerId)>,
840) -> RangeInclusive<Ctx::Height>
841where
842    Ctx: Context,
843{
844    let max_batch_size = max(1, max_range_size);
845
846    // Find the pending request with the smallest range.start where range.end >= initial_height
847    let next_range = pending_requests
848        .values()
849        .map(|(range, _)| range)
850        .filter(|range| *range.end() >= initial_height)
851        .min_by_key(|range| range.start());
852
853    // Start with the full max_batch_size range
854    let mut end_height = initial_height.increment_by(max_batch_size - 1);
855
856    // If there's a range in pending, constrain to that boundary
857    if let Some(range) = next_range {
858        // Check if initial_height is covered by this earliest range
859        if range.contains(&initial_height) {
860            panic!(
861                "Bug: initial_height {} is already covered by a pending request. This should never happen.",
862                initial_height.as_u64()
863            );
864        }
865
866        // Constrain to the blocking boundary
867        let boundary_end = range
868            .start()
869            .decrement()
870            .expect("range.start() should be decrementable since it's > initial_height");
871        end_height = min(end_height, boundary_end);
872    }
873
874    initial_height..=end_height
875}
876
877/// Find the next height that's not covered by any pending request starting from starting_height.
878fn find_next_uncovered_height<Ctx>(
879    starting_height: Ctx::Height,
880    pending_requests: &BTreeMap<OutboundRequestId, (RangeInclusive<Ctx::Height>, PeerId)>,
881) -> Ctx::Height
882where
883    Ctx: Context,
884{
885    let mut next_height = starting_height;
886    while let Some((covered_range, _)) = pending_requests
887        .values()
888        .find(|(r, _)| r.contains(&next_height))
889    {
890        next_height = covered_range.end().increment();
891    }
892    next_height
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898    use arc_malachitebft_test::{Height, TestContext};
899    use std::collections::BTreeMap;
900
901    type TestPendingRequests = BTreeMap<OutboundRequestId, (RangeInclusive<Height>, PeerId)>;
902
903    // Test case structures for table-driven tests
904
905    struct RangeTestCase {
906        name: &'static str,
907        initial_height: u64,
908        max_size: u64,
909        pending_ranges: &'static [(u64, u64)], // (start, end) pairs
910        expected_start: u64,
911        expected_end: u64,
912    }
913
914    struct PanicTestCase {
915        name: &'static str,
916        initial_height: u64,
917        max_size: u64,
918        pending_ranges: &'static [(u64, u64)], // (start, end) pairs
919        expected_panic_msg: &'static str,
920    }
921
922    struct HeightTestCase {
923        name: &'static str,
924        initial_height: u64,
925        pending_ranges: &'static [(u64, u64)], // (start, end) pairs
926        expected_height: u64,
927    }
928
929    // Tests for find_next_uncovered_range_from function
930
931    #[test]
932    fn test_find_next_uncovered_range_from_table() {
933        let test_cases = [
934            RangeTestCase {
935                name: "no pending requests",
936                initial_height: 10,
937                max_size: 5,
938                pending_ranges: &[],
939                expected_start: 10,
940                expected_end: 14,
941            },
942            RangeTestCase {
943                name: "max size one",
944                initial_height: 10,
945                max_size: 1,
946                pending_ranges: &[],
947                expected_start: 10,
948                expected_end: 10,
949            },
950            RangeTestCase {
951                name: "with blocking request",
952                initial_height: 10,
953                max_size: 5,
954                pending_ranges: &[(12, 15)],
955                expected_start: 10,
956                expected_end: 11,
957            },
958            RangeTestCase {
959                name: "zero max size becomes one",
960                initial_height: 10,
961                max_size: 0, // Should be treated as 1
962                pending_ranges: &[],
963                expected_start: 10,
964                expected_end: 10,
965            },
966            RangeTestCase {
967                name: "range starts immediately after",
968                initial_height: 15,
969                max_size: 5,
970                pending_ranges: &[(16, 20)],
971                expected_start: 15,
972                expected_end: 15, // boundary_end = 16 - 1 = 15, min(19, 15) = 15
973            },
974            RangeTestCase {
975                name: "height zero with range starting at one",
976                initial_height: 0,
977                max_size: 3,
978                pending_ranges: &[(1, 5)],
979                expected_start: 0,
980                expected_end: 0, // boundary_end = 1 - 1 = 0, min(2, 0) = 0
981            },
982            RangeTestCase {
983                name: "sync height just at range end",
984                initial_height: 11,
985                max_size: 4,
986                pending_ranges: &[(5, 10)],
987                expected_start: 11,
988                expected_end: 14, // max_end = 11 + 4 - 1 = 14
989            },
990            RangeTestCase {
991                name: "fill gap between ranges",
992                initial_height: 12,
993                max_size: 6,
994                pending_ranges: &[(5, 10), (20, 25)],
995                expected_start: 12,
996                expected_end: 17, // max_end = 12 + 6 - 1 = 17, boundary_end = 20 - 1 = 19, min(17, 19) = 17
997            },
998        ];
999
1000        for case in test_cases {
1001            let mut pending_requests = TestPendingRequests::new();
1002
1003            // Setup pending requests based on test case
1004            for (i, &(start, end)) in case.pending_ranges.iter().enumerate() {
1005                let peer = PeerId::random();
1006                pending_requests.insert(
1007                    OutboundRequestId::new(format!("req{}", i + 1)),
1008                    (Height::new(start)..=Height::new(end), peer),
1009                );
1010            }
1011
1012            let result = find_next_uncovered_range_from::<TestContext>(
1013                Height::new(case.initial_height),
1014                case.max_size,
1015                &pending_requests,
1016            );
1017
1018            assert_eq!(
1019                result,
1020                Height::new(case.expected_start)..=Height::new(case.expected_end),
1021                "Test case '{}' failed",
1022                case.name
1023            );
1024        }
1025    }
1026
1027    // Panic tests for find_next_uncovered_range_from function
1028
1029    #[test]
1030    fn test_find_next_uncovered_range_from_panic_cases() {
1031        let test_cases = [
1032            PanicTestCase {
1033                name: "sync height covered",
1034                initial_height: 12,
1035                max_size: 3,
1036                pending_ranges: &[(10, 15)],
1037                expected_panic_msg:
1038                    "Bug: initial_height 12 is already covered by a pending request",
1039            },
1040            PanicTestCase {
1041                name: "initial height equals range start",
1042                initial_height: 15,
1043                max_size: 5,
1044                pending_ranges: &[(15, 20)],
1045                expected_panic_msg:
1046                    "Bug: initial_height 15 is already covered by a pending request",
1047            },
1048            PanicTestCase {
1049                name: "sync height equals range end",
1050                initial_height: 15,
1051                max_size: 3,
1052                pending_ranges: &[(10, 15)],
1053                expected_panic_msg:
1054                    "Bug: initial_height 15 is already covered by a pending request",
1055            },
1056            PanicTestCase {
1057                name: "multiple consecutive blocks",
1058                initial_height: 16,
1059                max_size: 3,
1060                pending_ranges: &[(10, 15), (16, 20)],
1061                expected_panic_msg:
1062                    "Bug: initial_height 16 is already covered by a pending request",
1063            },
1064            PanicTestCase {
1065                name: "sync height zero with range starting at zero",
1066                initial_height: 0,
1067                max_size: 3,
1068                pending_ranges: &[(0, 5)],
1069                expected_panic_msg: "Bug: initial_height 0 is already covered by a pending request",
1070            },
1071        ];
1072
1073        for case in test_cases {
1074            let mut pending_requests = TestPendingRequests::new();
1075
1076            // Setup pending requests based on test case
1077            for (i, &(start, end)) in case.pending_ranges.iter().enumerate() {
1078                let peer = PeerId::random();
1079                pending_requests.insert(
1080                    OutboundRequestId::new(format!("req{}", i + 1)),
1081                    (Height::new(start)..=Height::new(end), peer),
1082                );
1083            }
1084
1085            let result = std::panic::catch_unwind(|| {
1086                find_next_uncovered_range_from::<TestContext>(
1087                    Height::new(case.initial_height),
1088                    case.max_size,
1089                    &pending_requests,
1090                )
1091            });
1092
1093            assert!(
1094                result.is_err(),
1095                "Test case '{}' should have panicked but didn't",
1096                case.name
1097            );
1098
1099            if let Err(panic_value) = result {
1100                if let Some(panic_msg) = panic_value.downcast_ref::<String>() {
1101                    assert!(
1102                        panic_msg.contains(case.expected_panic_msg),
1103                        "Test case '{}' panicked with wrong message. Expected: '{}', Got: '{}'",
1104                        case.name,
1105                        case.expected_panic_msg,
1106                        panic_msg
1107                    );
1108                } else if let Some(panic_msg) = panic_value.downcast_ref::<&str>() {
1109                    assert!(
1110                        panic_msg.contains(case.expected_panic_msg),
1111                        "Test case '{}' panicked with wrong message. Expected: '{}', Got: '{}'",
1112                        case.name,
1113                        case.expected_panic_msg,
1114                        panic_msg
1115                    );
1116                }
1117            }
1118        }
1119    }
1120
1121    // Tests for find_next_uncovered_height function
1122
1123    #[test]
1124    fn test_find_next_uncovered_height_table() {
1125        let test_cases = [
1126            HeightTestCase {
1127                name: "no pending requests",
1128                initial_height: 10,
1129                pending_ranges: &[],
1130                expected_height: 10,
1131            },
1132            HeightTestCase {
1133                name: "starting height covered",
1134                initial_height: 12,
1135                pending_ranges: &[(10, 15)],
1136                expected_height: 16, // Should return the height after the covered range
1137            },
1138            HeightTestCase {
1139                name: "starting height match request start",
1140                initial_height: 10,
1141                pending_ranges: &[(10, 15)],
1142                expected_height: 16, // Should return the height after the covered range
1143            },
1144            HeightTestCase {
1145                name: "starting height match request end",
1146                initial_height: 15,
1147                pending_ranges: &[(10, 15)],
1148                expected_height: 16, // Should return the height after the covered range
1149            },
1150            HeightTestCase {
1151                name: "starting height just before request start",
1152                initial_height: 9,
1153                pending_ranges: &[(10, 15)],
1154                expected_height: 9, // Should return the starting height
1155            },
1156            HeightTestCase {
1157                name: "multiple consecutive ranges",
1158                initial_height: 10,
1159                pending_ranges: &[(10, 15), (16, 20)],
1160                expected_height: 21, // Should skip over all consecutive ranges
1161            },
1162            HeightTestCase {
1163                name: "multiple consecutive ranges with a gap",
1164                initial_height: 10,
1165                pending_ranges: &[(10, 15), (16, 20), (24, 30)],
1166                expected_height: 21, // Should skip over consecutive ranges but stop at gap
1167            },
1168            HeightTestCase {
1169                name: "starting height covered multiple",
1170                initial_height: 12,
1171                pending_ranges: &[(10, 15), (15, 20)],
1172                expected_height: 21, // Should return the height after all covered ranges
1173            },
1174        ];
1175
1176        for case in test_cases {
1177            let mut pending_requests = TestPendingRequests::new();
1178
1179            // Setup pending requests based on test case
1180            for (i, &(start, end)) in case.pending_ranges.iter().enumerate() {
1181                let peer = PeerId::random();
1182                pending_requests.insert(
1183                    OutboundRequestId::new(format!("req{}", i + 1)),
1184                    (Height::new(start)..=Height::new(end), peer),
1185                );
1186            }
1187
1188            let result = find_next_uncovered_height::<TestContext>(
1189                Height::new(case.initial_height),
1190                &pending_requests,
1191            );
1192
1193            assert_eq!(
1194                result,
1195                Height::new(case.expected_height),
1196                "Test case '{}' failed",
1197                case.name
1198            );
1199        }
1200    }
1201
1202    #[test]
1203    fn test_validate_request_range() {
1204        let validate = validate_request_range::<TestContext>;
1205
1206        let tip_height = Height::new(20);
1207        let batch_size = 5;
1208
1209        // Valid range
1210        let range = Height::new(15)..=Height::new(19);
1211        assert!(validate(&range, tip_height, batch_size));
1212
1213        // Start greater than end
1214        let range = Height::new(18)..=Height::new(17);
1215        assert!(!validate(&range, tip_height, batch_size));
1216
1217        // Start greater than tip height
1218        let range = Height::new(21)..=Height::new(25);
1219        assert!(!validate(&range, tip_height, batch_size));
1220
1221        // Exceeds batch size
1222        let range = Height::new(10)..=Height::new(16);
1223        assert!(!validate(&range, tip_height, batch_size));
1224
1225        // No overflow
1226        let range = Height::new(0)..=Height::new(u64::MAX);
1227        assert!(!validate(&range, tip_height, batch_size));
1228    }
1229
1230    #[test]
1231    fn test_clamp_request_range() {
1232        let clamp = clamp_request_range::<TestContext>;
1233
1234        let tip_height = Height::new(20);
1235
1236        // Range within tip height
1237        let range = Height::new(15)..=Height::new(18);
1238        let clamped = clamp(&range, tip_height);
1239        assert_eq!(clamped, range);
1240
1241        // Range exceeding tip height
1242        let range = Height::new(18)..=Height::new(25);
1243        let clamped = clamp(&range, tip_height);
1244        assert_eq!(clamped, Height::new(18)..=tip_height);
1245
1246        // Range starting at tip height
1247        let range = tip_height..=Height::new(25);
1248        let clamped = clamp(&range, tip_height);
1249        assert_eq!(clamped, tip_height..=tip_height);
1250    }
1251}