1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::ops::RangeInclusive;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use bytesize::ByteSize;
9use derive_where::derive_where;
10use eyre::eyre;
11use ractor::{Actor, ActorProcessingErr, ActorRef};
12use rand::SeedableRng;
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn, Instrument};
15
16use malachitebft_codec as codec;
17use malachitebft_core_consensus::util::bounded_queue::BoundedQueue;
18use malachitebft_core_consensus::PeerId;
19use malachitebft_core_types::utils::height::DisplayRange;
20use malachitebft_core_types::ValueResponse as CoreValueResponse;
21use malachitebft_core_types::{CommitCertificate, Context};
22use malachitebft_sync::{
23 self as sync, HeightStartType, InboundRequestId, OutboundRequestId, RawDecidedValue, Request,
24 Response, Resumable,
25};
26
27use crate::consensus::{ConsensusMsg, ConsensusRef};
28use crate::host::{HostMsg, HostRef};
29use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status};
30use crate::util::ticker::ticker;
31use crate::util::timers::{TimeoutElapsed, TimerScheduler};
32
33pub trait SyncCodec<Ctx>
40where
41 Ctx: Context,
42 Self: codec::Codec<sync::Status<Ctx>>,
43 Self: codec::Codec<sync::Request<Ctx>>,
44 Self: codec::Codec<sync::Response<Ctx>>,
45 Self: codec::HasEncodedLen<sync::Response<Ctx>>,
46{
47}
48
49impl<Ctx, Codec> SyncCodec<Ctx> for Codec
50where
51 Ctx: Context,
52 Codec: codec::Codec<sync::Status<Ctx>>,
53 Codec: codec::Codec<sync::Request<Ctx>>,
54 Codec: codec::Codec<sync::Response<Ctx>>,
55 Codec: codec::HasEncodedLen<sync::Response<Ctx>>,
56{
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Hash)]
60pub enum Timeout {
61 Request(OutboundRequestId),
62}
63
64type Timers = TimerScheduler<Timeout>;
65
66pub type SyncRef<Ctx> = ActorRef<Msg<Ctx>>;
67pub type SyncMsg<Ctx> = Msg<Ctx>;
68
69#[derive_where(Clone, Debug)]
70pub struct RawDecidedBlock<Ctx: Context> {
71 pub height: Ctx::Height,
72 pub certificate: CommitCertificate<Ctx>,
73 pub value_bytes: Bytes,
74}
75
76#[derive_where(Clone, Debug)]
77pub struct InflightRequest<Ctx: Context> {
78 pub peer_id: PeerId,
79 pub request_id: OutboundRequestId,
80 pub request: Request<Ctx>,
81}
82
83pub type InflightRequests<Ctx> = HashMap<OutboundRequestId, InflightRequest<Ctx>>;
84
85#[derive_where(Clone, Debug)]
86pub enum Msg<Ctx: Context> {
87 Tick,
89
90 NetworkEvent(NetworkEvent<Ctx>),
92
93 Decided(Ctx::Height),
95
96 StartedHeight(Ctx::Height, HeightStartType),
100
101 GotDecidedValues(
103 InboundRequestId,
104 RangeInclusive<Ctx::Height>,
105 Vec<RawDecidedValue<Ctx>>,
106 ),
107
108 TimeoutElapsed(TimeoutElapsed<Timeout>),
110
111 InvalidValue(PeerId, Ctx::Height),
113
114 ValueProcessingError(PeerId, Ctx::Height),
116}
117
118impl<Ctx: Context> From<NetworkEvent<Ctx>> for Msg<Ctx> {
119 fn from(event: NetworkEvent<Ctx>) -> Self {
120 Msg::NetworkEvent(event)
121 }
122}
123
124impl<Ctx: Context> From<TimeoutElapsed<Timeout>> for Msg<Ctx> {
125 fn from(elapsed: TimeoutElapsed<Timeout>) -> Self {
126 Msg::TimeoutElapsed(elapsed)
127 }
128}
129
130#[derive(Debug)]
131pub struct Params {
132 pub status_update_interval: Duration,
136
137 pub request_timeout: Duration,
140}
141
142impl Default for Params {
143 fn default() -> Self {
144 Self {
145 status_update_interval: Duration::from_secs(5),
146 request_timeout: Duration::from_secs(10),
147 }
148 }
149}
150
151#[derive_where(Clone, Debug)]
153struct BufferedValue<Ctx: Context> {
154 request_id: OutboundRequestId,
155 value: CoreValueResponse<Ctx>,
156}
157
158impl<Ctx: Context> BufferedValue<Ctx> {
159 fn new(request_id: OutboundRequestId, value: CoreValueResponse<Ctx>) -> Self {
160 Self { request_id, value }
161 }
162}
163
164type SyncQueue<Ctx> = BoundedQueue<<Ctx as Context>::Height, BufferedValue<Ctx>>;
166
167enum StatusUpdateMode {
169 Interval(JoinHandle<()>), OnStartedHeight,
174}
175
176pub struct State<Ctx: Context> {
177 sync: sync::State<Ctx>,
179
180 timers: Timers,
182
183 inflight: InflightRequests<Ctx>,
185
186 sync_queue: SyncQueue<Ctx>,
188
189 status_update_mode: StatusUpdateMode,
191}
192
193struct HandlerState<'a, Ctx: Context> {
194 timers: &'a mut Timers,
197 inflight: &'a mut InflightRequests<Ctx>,
199 sync_queue: &'a mut SyncQueue<Ctx>,
201 consensus_height: Ctx::Height,
203}
204
205#[allow(dead_code)]
206pub struct Sync<Ctx, Codec>
207where
208 Ctx: Context,
209 Codec: SyncCodec<Ctx>,
210{
211 ctx: Ctx,
212 network: NetworkRef<Ctx>,
213 host: HostRef<Ctx>,
214 consensus: ConsensusRef<Ctx>,
215 params: Params,
216 sync_codec: Codec,
217 sync_config: sync::Config,
218 metrics: sync::Metrics,
219 span: tracing::Span,
220}
221
222impl<Ctx, Codec> Sync<Ctx, Codec>
223where
224 Ctx: Context,
225 Codec: SyncCodec<Ctx>,
226{
227 #[allow(clippy::too_many_arguments)]
228 pub fn new(
229 ctx: Ctx,
230 network: NetworkRef<Ctx>,
231 host: HostRef<Ctx>,
232 consensus: ConsensusRef<Ctx>,
233 params: Params,
234 sync_codec: Codec,
235 sync_config: sync::Config,
236 metrics: sync::Metrics,
237 span: tracing::Span,
238 ) -> Self {
239 Self {
240 ctx,
241 network,
242 host,
243 consensus,
244 params,
245 sync_codec,
246 sync_config,
247 metrics,
248 span,
249 }
250 }
251
252 #[allow(clippy::too_many_arguments)]
253 pub async fn spawn(
254 ctx: Ctx,
255 network: NetworkRef<Ctx>,
256 host: HostRef<Ctx>,
257 consensus: ConsensusRef<Ctx>,
258 params: Params,
259 sync_codec: Codec,
260 sync_config: sync::Config,
261 metrics: sync::Metrics,
262 span: tracing::Span,
263 ) -> Result<SyncRef<Ctx>, ractor::SpawnErr> {
264 let actor = Self::new(
265 ctx,
266 network,
267 host,
268 consensus,
269 params,
270 sync_codec,
271 sync_config,
272 metrics,
273 span,
274 );
275 let (actor_ref, _) = Actor::spawn(None, actor, ()).await?;
276 Ok(actor_ref)
277 }
278
279 async fn process_input(
280 &self,
281 myself: &ActorRef<Msg<Ctx>>,
282 state: &mut State<Ctx>,
283 input: sync::Input<Ctx>,
284 ) -> Result<(), ActorProcessingErr> {
285 let mut handler_state = HandlerState {
286 timers: &mut state.timers,
287 inflight: &mut state.inflight,
288 sync_queue: &mut state.sync_queue,
289 consensus_height: state.sync.consensus_height,
290 };
291
292 malachitebft_sync::process!(
293 input: input,
294 state: &mut state.sync,
295 metrics: &self.metrics,
296 with: effect => {
297 self.handle_effect(
298 myself,
299 &mut handler_state,
300 effect,
301 ).await
302 }
303 )
304 }
305
306 async fn get_history_min_height(&self) -> Result<Ctx::Height, ActorProcessingErr> {
307 ractor::call!(self.host, |reply_to| HostMsg::GetHistoryMinHeight {
308 reply_to
309 })
310 .map_err(|e| eyre!("Failed to get earliest history height: {e:?}").into())
311 }
312
313 async fn handle_effect(
314 &self,
315 myself: &ActorRef<Msg<Ctx>>,
316 state: &mut HandlerState<'_, Ctx>,
317 effect: sync::Effect<Ctx>,
318 ) -> Result<sync::Resume<Ctx>, ActorProcessingErr> {
319 use sync::Effect;
320
321 match effect {
322 Effect::BroadcastStatus(height, r) => {
323 let history_min_height = self.get_history_min_height().await?;
324
325 self.network.cast(NetworkMsg::BroadcastStatus(Status::new(
326 height,
327 history_min_height,
328 )))?;
329
330 Ok(r.resume_with(()))
331 }
332
333 Effect::SendValueRequest(peer_id, value_request, r) => {
334 let request = Request::ValueRequest(value_request);
335 let result = ractor::call!(self.network, |reply_to| {
336 NetworkMsg::OutgoingRequest(peer_id, request.clone(), reply_to)
337 });
338
339 match result {
340 Ok(request_id) => {
341 let request_id = OutboundRequestId::new(request_id);
342
343 state.timers.start_timer(
344 Timeout::Request(request_id.clone()),
345 self.params.request_timeout,
346 );
347
348 state.inflight.insert(
349 request_id.clone(),
350 InflightRequest {
351 peer_id,
352 request_id: request_id.clone(),
353 request,
354 },
355 );
356
357 info!(%peer_id, %request_id, "Sent value request to peer");
358
359 Ok(r.resume_with(Some(request_id)))
360 }
361 Err(e) => {
362 error!("Failed to send request to network layer: {e}");
363 Ok(r.resume_with(None))
364 }
365 }
366 }
367
368 Effect::SendValueResponse(request_id, value_response, r) => {
369 let response = Response::ValueResponse(value_response);
370 self.network
371 .cast(NetworkMsg::OutgoingResponse(request_id, response))?;
372
373 Ok(r.resume_with(()))
374 }
375
376 Effect::GetDecidedValues(request_id, range, r) => {
377 self.host.call_and_forward(
378 {
379 let range = range.clone();
380 |reply_to| HostMsg::GetDecidedValues { range, reply_to }
381 },
382 myself,
383 |values| Msg::<Ctx>::GotDecidedValues(request_id, range, values),
384 None,
385 )?;
386
387 Ok(r.resume_with(()))
388 }
389
390 Effect::ProcessValueResponse(peer_id, request_id, response, r) => {
391 self.process_value_response(state, peer_id, request_id, response);
392 Ok(r.resume_with(()))
393 }
394 }
395 }
396
397 fn process_value_response(
398 &self,
399 state: &mut HandlerState<'_, Ctx>,
400 peer_id: PeerId,
401 request_id: OutboundRequestId,
402 response: sync::ValueResponse<Ctx>,
403 ) {
404 let consensus_height = state.consensus_height;
405 let mut ignored = Vec::new();
406 let mut buffered = Vec::new();
407
408 for raw_value in response.values {
409 let height = raw_value.height();
410 let value = raw_value.to_core(peer_id);
411
412 match height.cmp(&consensus_height) {
413 Ordering::Less => {
415 ignored.push(height);
416 }
417
418 Ordering::Greater => {
420 let buffered_value = BufferedValue::new(request_id.clone(), value);
421 if state.sync_queue.push(height, buffered_value) {
422 buffered.push(height);
423 } else {
424 warn!(%peer_id, %request_id, %height, "Failed to buffer sync response, queue is full");
425 }
426 }
427
428 Ordering::Equal => {
430 debug!(%peer_id, %request_id, %height, "Processing value for current consensus height");
431
432 if let Err(e) = self
433 .consensus
434 .cast(ConsensusMsg::ProcessSyncResponse(value))
435 {
436 error!("Failed to forward value response to consensus: {e}");
437 }
438 }
439 }
440 }
441
442 self.metrics
443 .sync_queue_updated(state.sync_queue.len(), state.sync_queue.size());
444
445 if !ignored.is_empty() {
446 debug!(
447 %peer_id, %request_id, ?ignored,
448 "Ignored {} values for already decided heights", ignored.len()
449 );
450 }
451
452 if !buffered.is_empty() {
453 debug!(
454 %peer_id, %request_id, ?buffered,
455 "Buffered {} values for heights ahead of consensus", buffered.len()
456 );
457 }
458 }
459
460 async fn handle_msg(
461 &self,
462 myself: ActorRef<Msg<Ctx>>,
463 msg: Msg<Ctx>,
464 state: &mut State<Ctx>,
465 ) -> Result<(), ActorProcessingErr> {
466 match msg {
467 Msg::Tick => {
468 self.process_input(&myself, state, sync::Input::SendStatusUpdate)
469 .await?;
470 }
471
472 Msg::NetworkEvent(NetworkEvent::PeerDisconnected(peer_id)) => {
473 info!(%peer_id, "Disconnected from peer");
474
475 if state.sync.peers.remove(&peer_id).is_some() {
476 debug!(%peer_id, "Removed disconnected peer");
477 }
478 }
479
480 Msg::NetworkEvent(NetworkEvent::Status(peer_id, status)) => {
481 let status = sync::Status {
482 peer_id,
483 tip_height: status.tip_height,
484 history_min_height: status.history_min_height,
485 };
486
487 self.process_input(&myself, state, sync::Input::Status(status))
488 .await?;
489 }
490
491 Msg::NetworkEvent(NetworkEvent::SyncRequest(request_id, from, request)) => {
492 match request {
493 Request::ValueRequest(value_request) => {
494 self.process_input(
495 &myself,
496 state,
497 sync::Input::ValueRequest(request_id, from, value_request),
498 )
499 .await?;
500 }
501 };
502 }
503
504 Msg::NetworkEvent(NetworkEvent::SyncResponse(request_id, peer, response)) => {
505 state.timers.cancel(&Timeout::Request(request_id.clone()));
507
508 if state.inflight.remove(&request_id).is_none() {
510 debug!(%request_id, %peer, "Received response for unknown request");
511
512 return Ok(());
516 }
517
518 let response = response.map(|resp| match resp {
519 Response::ValueResponse(value_response) => value_response,
520 });
521
522 self.process_input(
523 &myself,
524 state,
525 sync::Input::ValueResponse(request_id, peer, response),
526 )
527 .await?;
528 }
529
530 Msg::NetworkEvent(_) => {
531 }
533
534 Msg::StartedHeight(height, restart) => {
536 if restart.is_restart() {
537 state.sync_queue.clear();
539 self.metrics.sync_queue_updated(0, 0);
540 }
541
542 self.process_input(&myself, state, sync::Input::StartedHeight(height, restart))
543 .await?;
544
545 if let StatusUpdateMode::OnStartedHeight = &state.status_update_mode {
549 self.process_input(&myself, state, sync::Input::SendStatusUpdate)
550 .await?;
551 }
552
553 for buffered in state.sync_queue.shift_and_take(&height) {
555 if let Err(e) = self
556 .consensus
557 .cast(ConsensusMsg::ProcessSyncResponse(buffered.value))
558 {
559 error!("Failed to forward buffered sync response to consensus: {e}");
560 break;
561 }
562 }
563
564 self.metrics
566 .sync_queue_heights
567 .set(state.sync_queue.len() as i64);
568 self.metrics
569 .sync_queue_size
570 .set(state.sync_queue.size() as i64);
571 }
572
573 Msg::Decided(height) => {
575 self.process_input(&myself, state, sync::Input::Decided(height))
576 .await?;
577 }
578
579 Msg::GotDecidedValues(request_id, range, mut values) => {
585 debug!(
586 %request_id,
587 range = %DisplayRange(&range),
588 values_count = values.len(),
589 "Processing decided values from host"
590 );
591
592 let max_response_size = ByteSize::b(self.sync_config.max_response_size as u64);
594 truncate_values_to_size_limit(&mut values, max_response_size, &self.sync_codec);
595
596 self.process_input(
597 &myself,
598 state,
599 sync::Input::GotDecidedValues(request_id, range, values),
600 )
601 .await?;
602 }
603
604 Msg::InvalidValue(peer, height) => {
605 if let Some((request_id, _)) = state.sync.get_request_id_by(height) {
609 let removed = state.sync_queue.retain(|_, bv| bv.request_id != request_id);
610
611 if removed > 0 {
612 debug!(
613 %peer, %height, %request_id, removed,
614 "Removed buffered values from invalidated request"
615 );
616 self.metrics
617 .sync_queue_updated(state.sync_queue.len(), state.sync_queue.size());
618 }
619 }
620
621 self.process_input(&myself, state, sync::Input::InvalidValue(peer, height))
622 .await?
623 }
624
625 Msg::ValueProcessingError(peer, height) => {
626 self.process_input(
627 &myself,
628 state,
629 sync::Input::ValueProcessingError(peer, height),
630 )
631 .await?
632 }
633
634 Msg::TimeoutElapsed(elapsed) => {
635 let Some(timeout) = state.timers.intercept_timer_msg(elapsed) else {
636 return Ok(());
638 };
639
640 info!(?timeout, "Timeout elapsed");
641
642 match timeout {
643 Timeout::Request(request_id) => {
644 if let Some(inflight) = state.inflight.remove(&request_id) {
645 self.process_input(
646 &myself,
647 state,
648 sync::Input::SyncRequestTimedOut(
649 request_id,
650 inflight.peer_id,
651 inflight.request,
652 ),
653 )
654 .await?;
655 } else {
656 debug!(%request_id, "Timeout for unknown request");
657 }
658 }
659 }
660 }
661 }
662
663 Ok(())
664 }
665}
666
667fn status_update_mode<Ctx, R>(
668 interval: Duration,
669 sync: &ActorRef<Msg<Ctx>>,
670 rng: &mut R,
671) -> StatusUpdateMode
672where
673 Ctx: Context,
674 R: rand::Rng,
675{
676 if interval == Duration::ZERO {
677 info!("Using status update mode: OnStartedHeight");
678 StatusUpdateMode::OnStartedHeight
679 } else {
680 info!("Using status update mode: Interval");
681
682 const ADJ_RATE: f64 = 0.01;
684 let adjustment = rng.gen_range(-ADJ_RATE..=ADJ_RATE);
685
686 let ticker = tokio::spawn(
687 ticker(interval, sync.clone(), adjustment, || Msg::Tick).in_current_span(),
688 );
689
690 StatusUpdateMode::Interval(ticker)
691 }
692}
693
694fn truncate_values_to_size_limit<Ctx, Codec>(
695 values: &mut Vec<RawDecidedValue<Ctx>>,
696 max_response_size: ByteSize,
697 codec: &Codec,
698) where
699 Ctx: Context,
700 Codec: SyncCodec<Ctx>,
701{
702 let mut current_size = ByteSize::b(0);
703 let mut keep_count = 0;
704
705 for value in values.iter() {
706 let height = value.certificate.height;
707
708 let value_response =
709 Response::ValueResponse(sync::ValueResponse::new(height, vec![value.clone()]));
710
711 let value_size = match codec.encoded_len(&value_response) {
712 Ok(size) => ByteSize::b(size as u64),
713 Err(e) => {
714 error!("Failed to get response size for value, stopping at height {height}: {e}");
715 break;
716 }
717 };
718
719 if current_size + value_size > max_response_size {
720 warn!(
721 %max_response_size, %current_size, %value_size,
722 "Maximum size limit would be exceeded, stopping at height {height}"
723 );
724 break;
725 }
726
727 current_size += value_size;
728 keep_count += 1;
729 }
730
731 values.truncate(keep_count);
733}
734
735#[async_trait]
736impl<Ctx, Codec> Actor for Sync<Ctx, Codec>
737where
738 Ctx: Context,
739 Codec: SyncCodec<Ctx>,
740{
741 type Msg = Msg<Ctx>;
742 type State = State<Ctx>;
743 type Arguments = ();
744
745 async fn pre_start(
746 &self,
747 myself: ActorRef<Self::Msg>,
748 _args: Self::Arguments,
749 ) -> Result<Self::State, ActorProcessingErr> {
750 self.network
751 .cast(NetworkMsg::Subscribe(Box::new(myself.clone())))?;
752
753 let mut rng = Box::new(rand::rngs::StdRng::from_entropy());
754
755 let status_update_mode =
756 status_update_mode(self.params.status_update_interval, &myself, &mut rng);
757
758 let queue_capacity = 2 * self.sync_config.parallel_requests * self.sync_config.batch_size;
761
762 Ok(State {
763 sync: sync::State::new(rng, self.sync_config),
764 timers: Timers::new(Box::new(myself.clone())),
765 inflight: HashMap::new(),
766 sync_queue: SyncQueue::new(queue_capacity),
767 status_update_mode,
768 })
769 }
770
771 #[tracing::instrument(
772 name = "sync",
773 parent = &self.span,
774 skip_all,
775 fields(
776 tip_height = %state.sync.tip_height,
777 sync_height = %state.sync.sync_height,
778 ),
779 )]
780 async fn handle(
781 &self,
782 myself: ActorRef<Self::Msg>,
783 msg: Self::Msg,
784 state: &mut Self::State,
785 ) -> Result<(), ActorProcessingErr> {
786 if let Err(e) = self.handle_msg(myself, msg, state).await {
787 error!("Error handling message: {e:?}");
788 }
789
790 Ok(())
791 }
792
793 async fn post_stop(
794 &self,
795 _myself: ActorRef<Self::Msg>,
796 state: &mut Self::State,
797 ) -> Result<(), ActorProcessingErr> {
798 if let StatusUpdateMode::Interval(ticker) = &state.status_update_mode {
799 ticker.abort();
800 }
801
802 Ok(())
803 }
804}